summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce
diff options
context:
space:
mode:
authormaybenotilya <[email protected]>2026-04-17 11:57:44 +0300
committermaybenotilya <[email protected]>2026-04-17 12:34:55 +0300
commita6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf (patch)
tree087131eb0146a82cd9c72d1b0a290d8d081c021e /yt/cpp/mapreduce
parent7bcfb91453e538211ed44e97c57d1104f543f920 (diff)
YT-26179: Add Abort for readers
* Changelog entry Type: feature Component: cpp-sdk Add Abort for table readers commit_hash:be5026883c90d18333f20468946f1e5a5b71839b
Diffstat (limited to 'yt/cpp/mapreduce')
-rw-r--r--yt/cpp/mapreduce/client/client_reader.cpp17
-rw-r--r--yt/cpp/mapreduce/client/client_reader.h7
-rw-r--r--yt/cpp/mapreduce/client/file_reader.cpp14
-rw-r--r--yt/cpp/mapreduce/client/file_reader.h14
-rw-r--r--yt/cpp/mapreduce/client/partition_reader.cpp16
-rw-r--r--yt/cpp/mapreduce/common/abortable_stream.cpp124
-rw-r--r--yt/cpp/mapreduce/common/abortable_stream.h21
-rw-r--r--yt/cpp/mapreduce/common/halting_stream.cpp67
-rw-r--r--yt/cpp/mapreduce/common/halting_stream.h15
-rw-r--r--yt/cpp/mapreduce/common/retry_lib.cpp2
-rw-r--r--yt/cpp/mapreduce/common/ya.make5
-rw-r--r--yt/cpp/mapreduce/http/abortable_http_response.cpp27
-rw-r--r--yt/cpp/mapreduce/http/abortable_http_response.h18
-rw-r--r--yt/cpp/mapreduce/http/http_client.cpp36
-rw-r--r--yt/cpp/mapreduce/http/http_client.h15
-rw-r--r--yt/cpp/mapreduce/http_client/raw_client.cpp8
-rw-r--r--yt/cpp/mapreduce/http_client/raw_client.h8
-rw-r--r--yt/cpp/mapreduce/interface/abortable_stream.cpp34
-rw-r--r--yt/cpp/mapreduce/interface/abortable_stream.h33
-rw-r--r--yt/cpp/mapreduce/interface/config.cpp4
-rw-r--r--yt/cpp/mapreduce/interface/config.h12
-rw-r--r--yt/cpp/mapreduce/interface/errors.h7
-rw-r--r--yt/cpp/mapreduce/interface/io-inl.h12
-rw-r--r--yt/cpp/mapreduce/interface/io.cpp10
-rw-r--r--yt/cpp/mapreduce/interface/io.h11
-rw-r--r--yt/cpp/mapreduce/interface/raw_client.h10
-rw-r--r--yt/cpp/mapreduce/interface/ya.make1
-rw-r--r--yt/cpp/mapreduce/io/counting_raw_reader.cpp10
-rw-r--r--yt/cpp/mapreduce/io/counting_raw_reader.h3
-rw-r--r--yt/cpp/mapreduce/io/lenval_table_reader.cpp4
-rw-r--r--yt/cpp/mapreduce/io/node_table_reader.cpp14
-rw-r--r--yt/cpp/mapreduce/io/node_table_reader.h4
-rw-r--r--yt/cpp/mapreduce/io/proto_table_reader.cpp20
-rw-r--r--yt/cpp/mapreduce/io/proto_table_reader.h4
-rw-r--r--yt/cpp/mapreduce/io/skiff_row_table_reader.cpp14
-rw-r--r--yt/cpp/mapreduce/io/skiff_row_table_reader.h2
-rw-r--r--yt/cpp/mapreduce/io/skiff_table_reader.cpp14
-rw-r--r--yt/cpp/mapreduce/io/skiff_table_reader.h3
-rw-r--r--yt/cpp/mapreduce/io/stream_raw_reader.cpp10
-rw-r--r--yt/cpp/mapreduce/io/stream_table_reader.h17
-rw-r--r--yt/cpp/mapreduce/io/yamr_table_reader.cpp10
-rw-r--r--yt/cpp/mapreduce/io/yamr_table_reader.h2
-rw-r--r--yt/cpp/mapreduce/rpc_client/raw_client.cpp65
-rw-r--r--yt/cpp/mapreduce/rpc_client/raw_client.h8
-rw-r--r--yt/cpp/mapreduce/rpc_client/ya.make1
-rw-r--r--yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h5
46 files changed, 674 insertions, 84 deletions
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp
index 1eade18397a..0bbc6fa1322 100644
--- a/yt/cpp/mapreduce/client/client_reader.cpp
+++ b/yt/cpp/mapreduce/client/client_reader.cpp
@@ -84,6 +84,11 @@ bool TClientReader::Retry(
const TMaybe<ui64>& rowIndex,
const std::exception_ptr& error)
{
+ // We always stop retries if reader is aborted
+ if (IAbortableInputStream::IsAbortedError(error)) {
+ std::rethrow_exception(error);
+ }
+
if (CurrentRequestRetryPolicy_) {
TMaybe<TDuration> backoffDuration;
try {
@@ -124,6 +129,16 @@ void TClientReader::ResetRetries()
CurrentRequestRetryPolicy_ = nullptr;
}
+void TClientReader::Abort()
+{
+ Input_->Abort();
+}
+
+bool TClientReader::IsAborted() const
+{
+ return Input_->IsAborted();
+}
+
size_t TClientReader::DoRead(void* buf, size_t len)
{
return Input_->Read(buf, len);
@@ -178,7 +193,7 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u
ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex));
}
- Input_ = NDetail::RequestWithRetry<std::unique_ptr<IInputStream>>(
+ Input_ = NDetail::RequestWithRetry<std::unique_ptr<IAbortableInputStream>>(
CurrentRequestRetryPolicy_,
[this, &transactionId] (TMutationId /*mutationId*/) {
return RawClient_->ReadTable(transactionId, Path_, Format_, Options_);
diff --git a/yt/cpp/mapreduce/client/client_reader.h b/yt/cpp/mapreduce/client/client_reader.h
index 9f3160c9c7d..19a9f1ad173 100644
--- a/yt/cpp/mapreduce/client/client_reader.h
+++ b/yt/cpp/mapreduce/client/client_reader.h
@@ -2,6 +2,8 @@
#include <yt/cpp/mapreduce/common/fwd.h>
+#include <yt/cpp/mapreduce/interface/abortable_stream.h>
+
#include <yt/cpp/mapreduce/http/context.h>
#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/http/http.h>
@@ -37,6 +39,9 @@ public:
bool HasRangeIndices() const override { return true; }
+ void Abort() override;
+ bool IsAborted() const override;
+
protected:
size_t DoRead(void* buf, size_t len) override;
@@ -53,7 +58,7 @@ private:
std::unique_ptr<TPingableTransaction> ReadTransaction_;
- std::unique_ptr<IInputStream> Input_;
+ std::unique_ptr<IAbortableInputStream> Input_;
IRequestRetryPolicyPtr CurrentRequestRetryPolicy_;
diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp
index 6163dd55331..ffdd2c9ac30 100644
--- a/yt/cpp/mapreduce/client/file_reader.cpp
+++ b/yt/cpp/mapreduce/client/file_reader.cpp
@@ -50,6 +50,16 @@ TStreamReaderBase::TStreamReaderBase(
TStartTransactionOptions()))
{ }
+void TStreamReaderBase::Abort()
+{
+ Input_->Abort();
+}
+
+bool TStreamReaderBase::IsAborted() const
+{
+ return Input_->IsAborted();
+}
+
TStreamReaderBase::~TStreamReaderBase() = default;
TYPath TStreamReaderBase::Snapshot(const TYPath& path)
@@ -98,7 +108,7 @@ TFileReader::TFileReader(
Path_.Path_ = TStreamReaderBase::Snapshot(Path_.Path_);
}
-std::unique_ptr<IInputStream> TFileReader::Request(const TTransactionId& transactionId, ui64 readBytes)
+std::unique_ptr<IAbortableInputStream> TFileReader::Request(const TTransactionId& transactionId, ui64 readBytes)
{
const ui64 currentOffset = StartOffset_ + readBytes;
@@ -130,7 +140,7 @@ TBlobTableReader::TBlobTableReader(
Path_ = TStreamReaderBase::Snapshot(path);
}
-std::unique_ptr<IInputStream> TBlobTableReader::Request(const TTransactionId& transactionId, ui64 readBytes)
+std::unique_ptr<IAbortableInputStream> TBlobTableReader::Request(const TTransactionId& transactionId, ui64 readBytes)
{
const i64 currentOffset = StartOffset_ + readBytes;
const i64 startPartIndex = currentOffset / Options_.PartSize_;
diff --git a/yt/cpp/mapreduce/client/file_reader.h b/yt/cpp/mapreduce/client/file_reader.h
index d3efe90f26a..6c7bcc8935e 100644
--- a/yt/cpp/mapreduce/client/file_reader.h
+++ b/yt/cpp/mapreduce/client/file_reader.h
@@ -2,13 +2,12 @@
#include <yt/cpp/mapreduce/common/fwd.h>
+#include <yt/cpp/mapreduce/interface/abortable_stream.h>
#include <yt/cpp/mapreduce/interface/io.h>
#include <yt/cpp/mapreduce/http/context.h>
#include <yt/cpp/mapreduce/http/requests.h>
-class IInputStream;
-
namespace NYT {
class TPingableTransaction;
@@ -27,6 +26,9 @@ public:
const TClientContext& context,
const TTransactionId& transactionId);
+ void Abort() override;
+ bool IsAborted() const override;
+
~TStreamReaderBase();
protected:
@@ -37,13 +39,13 @@ protected:
private:
size_t DoRead(void* buf, size_t len) override;
- virtual std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) = 0;
+ virtual std::unique_ptr<IAbortableInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) = 0;
private:
const IClientRetryPolicyPtr ClientRetryPolicy_;
TFileReaderOptions FileReaderOptions_;
- std::unique_ptr<IInputStream> Input_;
+ std::unique_ptr<IAbortableInputStream> Input_;
std::unique_ptr<TPingableTransaction> ReadTransaction_;
@@ -66,7 +68,7 @@ public:
const TFileReaderOptions& options = {});
private:
- std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override;
+ std::unique_ptr<IAbortableInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override;
private:
const ui64 StartOffset_;
@@ -93,7 +95,7 @@ public:
const TBlobTableReaderOptions& options = {});
private:
- std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override;
+ std::unique_ptr<IAbortableInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override;
private:
const ui64 StartOffset_;
diff --git a/yt/cpp/mapreduce/client/partition_reader.cpp b/yt/cpp/mapreduce/client/partition_reader.cpp
index a2ca7eb9758..d849040d854 100644
--- a/yt/cpp/mapreduce/client/partition_reader.cpp
+++ b/yt/cpp/mapreduce/client/partition_reader.cpp
@@ -12,7 +12,7 @@ class TPartitionTableReader
: public TRawTableReader
{
public:
- TPartitionTableReader(std::unique_ptr<IInputStream> input)
+ TPartitionTableReader(std::unique_ptr<IAbortableInputStream> input)
: Input_(std::move(input))
{ }
@@ -32,6 +32,16 @@ public:
return false;
}
+ void Abort() override
+ {
+ Input_->Abort();
+ }
+
+ bool IsAborted() const override
+ {
+ return Input_->IsAborted();
+ }
+
protected:
size_t DoRead(void* buf, size_t len) override
{
@@ -39,7 +49,7 @@ protected:
}
private:
- std::unique_ptr<IInputStream> Input_;
+ std::unique_ptr<IAbortableInputStream> Input_;
};
////////////////////////////////////////////////////////////////////////////////
@@ -52,7 +62,7 @@ TRawTableReaderPtr CreateTablePartitionReader(
const TTablePartitionReaderOptions& options)
{
- auto stream = NDetail::RequestWithRetry<std::unique_ptr<IInputStream>>(
+ auto stream = NDetail::RequestWithRetry<std::unique_ptr<IAbortableInputStream>>(
retryPolicy,
[&] (TMutationId /*mutationId*/) {
return rawClient->ReadTablePartition(cookie, format, options);
diff --git a/yt/cpp/mapreduce/common/abortable_stream.cpp b/yt/cpp/mapreduce/common/abortable_stream.cpp
new file mode 100644
index 00000000000..6336d792f14
--- /dev/null
+++ b/yt/cpp/mapreduce/common/abortable_stream.cpp
@@ -0,0 +1,124 @@
+#include "abortable_stream.h"
+
+#include <yt/cpp/mapreduce/interface/errors.h>
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <yt/yt/core/concurrency/async_stream.h>
+#include <yt/yt/core/concurrency/scheduler_api.h>
+
+#include <library/cpp/yt/logging/logger.h>
+#include <library/cpp/yt/memory/ref.h>
+
+#include <util/system/spinlock.h>
+
+namespace NYT::NDetail {
+
+using namespace NConcurrency;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TAbortableInputStreamAdapter
+ : public IAbortableInputStream
+{
+public:
+ explicit TAbortableInputStreamAdapter(IAsyncInputStreamPtr underlyingStream)
+ : UnderlyingStream_(std::move(underlyingStream))
+ { }
+
+ void Abort() override
+ {
+ auto guard = Guard(Lock_);
+ IsAborted_ = true;
+ CurrentFuture_.Cancel(TError("Stream was aborted"));
+ }
+
+ bool IsAborted() const override
+ {
+ auto guard = Guard(Lock_);
+ return IsAborted_;
+ }
+
+private:
+ const IAsyncInputStreamPtr UnderlyingStream_;
+
+ TAdaptiveLock Lock_;
+ bool IsAborted_ = false;
+ TFuture<size_t> CurrentFuture_;
+
+ size_t DoRead(void* buffer, size_t length) override
+ {
+ if (length == 0) {
+ return 0;
+ }
+
+ struct TAbortableInputStreamBufferTag { };
+ auto readBuffer = TSharedMutableRef::Allocate<TAbortableInputStreamBufferTag>(length);
+
+ auto future = UnderlyingStream_->Read(readBuffer);
+ {
+ auto guard = Guard(Lock_);
+ CurrentFuture_ = future;
+ if (IsAborted_) {
+ future.Cancel(TError("Stream was aborted"));
+ }
+ }
+
+ auto result = WaitFor(future);
+
+ {
+ auto guard = Guard(Lock_);
+ if (IsAborted_) {
+ ythrow TInputStreamAbortedError() << "Stream was aborted";
+ }
+ }
+
+ auto bytesRead = result.ValueOrThrow();
+
+ memcpy(buffer, readBuffer.Begin(), bytesRead);
+
+ return bytesRead;
+ }
+};
+
+std::unique_ptr<IAbortableInputStream> CreateAbortableInputStreamAdapter(
+ IAsyncInputStreamPtr underlyingStream)
+{
+ YT_VERIFY(underlyingStream);
+ return std::make_unique<TAbortableInputStreamAdapter>(
+ std::move(underlyingStream));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TAbortableStreamFallback
+ : public IAbortableInputStream
+{
+public:
+ explicit TAbortableStreamFallback(IInputStream* underlyingStream)
+ : UnderlyingStream_(underlyingStream)
+ { }
+
+ void Abort() override
+ {
+ YT_LOG_WARNING("Abort for this stream type is not supported");
+ }
+
+private:
+ IInputStream* const UnderlyingStream_;
+
+ size_t DoRead(void* buffer, size_t length) override
+ {
+ return UnderlyingStream_->Read(buffer, length);
+ }
+};
+
+std::unique_ptr<IAbortableInputStream> CreateAbortableInputStreamAdapterFallback(
+ IInputStream* underlyingStream)
+{
+ YT_VERIFY(underlyingStream);
+ return std::make_unique<TAbortableStreamFallback>(underlyingStream);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/common/abortable_stream.h b/yt/cpp/mapreduce/common/abortable_stream.h
new file mode 100644
index 00000000000..8b58e6582fc
--- /dev/null
+++ b/yt/cpp/mapreduce/common/abortable_stream.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <yt/cpp/mapreduce/interface/abortable_stream.h>
+
+#include <yt/yt/core/concurrency/async_stream.h>
+
+#include <memory>
+
+namespace NYT::NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+std::unique_ptr<IAbortableInputStream> CreateAbortableInputStreamAdapter(
+ NConcurrency::IAsyncInputStreamPtr underlyingStream);
+
+std::unique_ptr<IAbortableInputStream> CreateAbortableInputStreamAdapterFallback(
+ IInputStream* underlyingStream);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/common/halting_stream.cpp b/yt/cpp/mapreduce/common/halting_stream.cpp
new file mode 100644
index 00000000000..ba14d9af316
--- /dev/null
+++ b/yt/cpp/mapreduce/common/halting_stream.cpp
@@ -0,0 +1,67 @@
+#include "halting_stream.h"
+
+#include <yt/yt/core/actions/bind.h>
+#include <yt/yt/core/concurrency/async_stream.h>
+
+namespace NYT::NDetail {
+
+using namespace NConcurrency;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class THaltingAsyncStream
+ : public IAsyncInputStream
+{
+public:
+ THaltingAsyncStream(
+ IAsyncInputStreamPtr underlying,
+ i64 bytesBeforeHalt)
+ : Underlying_(std::move(underlying))
+ , BytesBeforeHalt_(bytesBeforeHalt)
+ { }
+
+private:
+ void OnRead(TPromise<size_t> promise, const TErrorOr<size_t>& result)
+ {
+ if (result.IsOK()) {
+ BytesRead_ += result.Value();
+ }
+ promise.TrySet(result);
+ }
+
+ TFuture<size_t> Read(const TSharedMutableRef& buffer) override
+ {
+ if (BytesRead_ >= BytesBeforeHalt_) {
+ HaltPromise_ = NewPromise<size_t>();
+ return HaltPromise_.ToFuture();
+ }
+
+ auto limit = std::min(buffer.Size(), static_cast<size_t>(BytesBeforeHalt_ - BytesRead_));
+ auto promise = NewPromise<size_t>();
+ auto future = promise.ToFuture();
+
+ Underlying_->Read(buffer.Slice(0, limit)).Subscribe(
+ BIND(&THaltingAsyncStream::OnRead, MakeStrong(this), std::move(promise)));
+
+ return future;
+ }
+
+private:
+ IAsyncInputStreamPtr Underlying_;
+ const i64 BytesBeforeHalt_;
+ i64 BytesRead_ = 0;
+ TPromise<size_t> HaltPromise_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+IAsyncInputStreamPtr CreateHaltingAsyncStream(
+ IAsyncInputStreamPtr underlying,
+ i64 bytesBeforeHalt)
+{
+ return New<THaltingAsyncStream>(std::move(underlying), bytesBeforeHalt);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/common/halting_stream.h b/yt/cpp/mapreduce/common/halting_stream.h
new file mode 100644
index 00000000000..10b8f75e1f7
--- /dev/null
+++ b/yt/cpp/mapreduce/common/halting_stream.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include <yt/yt/core/concurrency/async_stream.h>
+
+namespace NYT::NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+NConcurrency::IAsyncInputStreamPtr CreateHaltingAsyncStream(
+ NConcurrency::IAsyncInputStreamPtr underlying,
+ i64 bytesBeforeHalt);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/common/retry_lib.cpp b/yt/cpp/mapreduce/common/retry_lib.cpp
index abcb9e87b1a..9e63455242a 100644
--- a/yt/cpp/mapreduce/common/retry_lib.cpp
+++ b/yt/cpp/mapreduce/common/retry_lib.cpp
@@ -257,6 +257,8 @@ bool IsRetriable(const std::exception& ex)
{
if (dynamic_cast<const TRequestRetriesTimeout*>(&ex)) {
return false;
+ } else if (dynamic_cast<const TInputStreamAbortedError*>(&ex)) {
+ return false;
}
return true;
}
diff --git a/yt/cpp/mapreduce/common/ya.make b/yt/cpp/mapreduce/common/ya.make
index 0b427d5ce40..6500da9e8e0 100644
--- a/yt/cpp/mapreduce/common/ya.make
+++ b/yt/cpp/mapreduce/common/ya.make
@@ -3,8 +3,10 @@ LIBRARY()
INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
SRCS(
- expected_error_guard.cpp
+ abortable_stream.cpp
debug_metrics.cpp
+ expected_error_guard.cpp
+ halting_stream.cpp
helpers.cpp
retry_lib.cpp
wait_proxy.cpp
@@ -19,6 +21,7 @@ PEERDIR(
library/cpp/yson/node
yt/cpp/mapreduce/interface
yt/cpp/mapreduce/interface/logging
+ yt/yt/core
)
END()
diff --git a/yt/cpp/mapreduce/http/abortable_http_response.cpp b/yt/cpp/mapreduce/http/abortable_http_response.cpp
index 7c337651f0a..a75d9e558ed 100644
--- a/yt/cpp/mapreduce/http/abortable_http_response.cpp
+++ b/yt/cpp/mapreduce/http/abortable_http_response.cpp
@@ -52,8 +52,8 @@ public:
auto g = Guard(Lock_);
int result = 0;
for (auto& response : ResponseList_) {
- if (!response.IsAborted() && response.GetUrl().find(urlPattern) != TString::npos) {
- response.Abort();
+ if (!response.IsResponseAborted() && response.GetUrl().find(urlPattern) != TString::npos) {
+ response.AbortResponse();
++result;
}
}
@@ -112,7 +112,7 @@ TAbortableHttpResponseBase::~TAbortableHttpResponseBase()
TAbortableHttpResponseRegistry::Get().Remove(this);
}
-void TAbortableHttpResponseBase::Abort()
+void TAbortableHttpResponseBase::AbortResponse()
{
Aborted_ = true;
}
@@ -121,7 +121,7 @@ void TAbortableHttpResponseBase::SetLengthLimit(size_t limit)
{
LengthLimit_ = limit;
if (LengthLimit_ == 0) {
- Abort();
+ AbortResponse();
}
}
@@ -130,7 +130,7 @@ const TString& TAbortableHttpResponseBase::GetUrl() const
return Url_;
}
-bool TAbortableHttpResponseBase::IsAborted() const
+bool TAbortableHttpResponseBase::IsResponseAborted() const
{
return Aborted_;
}
@@ -155,7 +155,7 @@ size_t TAbortableHttpResponse::DoRead(void* buf, size_t len)
auto read = THttpResponse::DoRead(buf, len);
LengthLimit_ -= read;
if (LengthLimit_ == 0) {
- Abort();
+ AbortResponse();
}
return read;
}
@@ -188,7 +188,7 @@ TAbortableHttpResponse::TOutage TAbortableHttpResponse::StartOutage(
}
TAbortableCoreHttpResponse::TAbortableCoreHttpResponse(
- std::unique_ptr<IInputStream> stream,
+ std::unique_ptr<IAbortableInputStream> stream,
const TString& url)
: TAbortableHttpResponseBase(url)
, Stream_(std::move(stream))
@@ -204,7 +204,7 @@ size_t TAbortableCoreHttpResponse::DoRead(void* buf, size_t len)
auto read = Stream_->Read(buf, len);
LengthLimit_ -= read;
if (LengthLimit_ == 0) {
- Abort();
+ AbortResponse();
}
return read;
@@ -218,6 +218,17 @@ size_t TAbortableCoreHttpResponse::DoSkip(size_t len)
return Stream_->Skip(len);
}
+// IAbortableInputStream
+void TAbortableCoreHttpResponse::Abort()
+{
+ Stream_->Abort();
+}
+
+bool TAbortableCoreHttpResponse::IsAborted() const
+{
+ return Stream_->IsAborted();
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
diff --git a/yt/cpp/mapreduce/http/abortable_http_response.h b/yt/cpp/mapreduce/http/abortable_http_response.h
index e9b1483bf70..db8f97d50ba 100644
--- a/yt/cpp/mapreduce/http/abortable_http_response.h
+++ b/yt/cpp/mapreduce/http/abortable_http_response.h
@@ -35,9 +35,9 @@ class IAbortableHttpResponse
: public TIntrusiveListItem<IAbortableHttpResponse>
{
public:
- virtual void Abort() = 0;
+ virtual void AbortResponse() = 0;
virtual const TString& GetUrl() const = 0;
- virtual bool IsAborted() const = 0;
+ virtual bool IsResponseAborted() const = 0;
virtual void SetLengthLimit(size_t limit) = 0;
virtual ~IAbortableHttpResponse() = default;
@@ -50,9 +50,9 @@ public:
TAbortableHttpResponseBase(const TString& url);
~TAbortableHttpResponseBase();
- void Abort() override;
+ void AbortResponse() override;
const TString& GetUrl() const override;
- bool IsAborted() const override;
+ bool IsResponseAborted() const override;
void SetLengthLimit(size_t limit) override;
protected:
@@ -65,20 +65,24 @@ protected:
/// @brief Stream wrapper for @ref NYT::NHttpClient::TCoreHttpResponse with possibility to emulate errors.
class TAbortableCoreHttpResponse
- : public IInputStream
+ : public IAbortableInputStream
, public TAbortableHttpResponseBase
{
public:
TAbortableCoreHttpResponse(
- std::unique_ptr<IInputStream> stream,
+ std::unique_ptr<IAbortableInputStream> stream,
const TString& url);
+ // IAbortableInputStream
+ void Abort() override;
+ bool IsAborted() const override;
+
private:
size_t DoRead(void* buf, size_t len) override;
size_t DoSkip(size_t len) override;
private:
- std::unique_ptr<IInputStream> Stream_;
+ std::unique_ptr<IAbortableInputStream> Stream_;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp
index 8f0fb723d2d..454ba761241 100644
--- a/yt/cpp/mapreduce/http/http_client.cpp
+++ b/yt/cpp/mapreduce/http/http_client.cpp
@@ -5,7 +5,9 @@
#include "helpers.h"
#include "http.h"
+#include <yt/cpp/mapreduce/common/abortable_stream.h>
#include <yt/cpp/mapreduce/common/expected_error_guard.h>
+#include <yt/cpp/mapreduce/common/halting_stream.h>
#include <yt/cpp/mapreduce/interface/config.h>
@@ -126,9 +128,12 @@ public:
return Request_->GetHttpCode();
}
- IInputStream* GetResponseStream() override
+ IAbortableInputStream* GetResponseStream() override
{
- return Request_->GetResponseStream();
+ if (!Stream_) {
+ Stream_ = NDetail::CreateAbortableInputStreamAdapterFallback(Request_->GetResponseStream());
+ }
+ return Stream_.get();
}
TString GetResponse() override
@@ -143,6 +148,7 @@ public:
private:
std::unique_ptr<THttpRequest> Request_;
+ std::unique_ptr<IAbortableInputStream> Stream_;
};
class TDefaultHttpRequest
@@ -225,11 +231,15 @@ public:
return static_cast<int>(Response_->GetStatusCode());
}
- IInputStream* GetResponseStream() override
+ IAbortableInputStream* GetResponseStream() override
{
if (!Stream_) {
+ NConcurrency::IAsyncInputStreamPtr asyncStream = NConcurrency::CreateCopyingAdapter(Response_);
+ if (TConfig::Get()->UseHaltingResponse) {
+ asyncStream = NDetail::CreateHaltingAsyncStream(std::move(asyncStream), TConfig::Get()->HaltingResponseBytesLimit);
+ }
auto stream = std::make_unique<TWrappedStream>(
- NConcurrency::CreateSyncAdapter(NConcurrency::CreateCopyingAdapter(Response_), NConcurrency::EWaitForStrategy::WaitFor),
+ NDetail::CreateAbortableInputStreamAdapter(std::move(asyncStream)),
Response_,
Context_.RequestId);
CheckErrorResponse(Context_.HostName, Context_.RequestId, Response_);
@@ -277,15 +287,25 @@ public:
private:
class TWrappedStream
- : public IInputStream
+ : public IAbortableInputStream
{
public:
- TWrappedStream(std::unique_ptr<IInputStream> underlying, NHttp::IResponsePtr response, TString requestId)
+ TWrappedStream(std::unique_ptr<IAbortableInputStream> underlying, NHttp::IResponsePtr response, TString requestId)
: Underlying_(std::move(underlying))
, Response_(std::move(response))
, RequestId_(std::move(requestId))
{ }
+ void Abort() override
+ {
+ Underlying_->Abort();
+ }
+
+ bool IsAborted() const override
+ {
+ return Underlying_->IsAborted();
+ }
+
protected:
size_t DoRead(void* buf, size_t len) override
{
@@ -333,7 +353,7 @@ private:
}
private:
- std::unique_ptr<IInputStream> Underlying_;
+ std::unique_ptr<IAbortableInputStream> Underlying_;
NHttp::IResponsePtr Response_;
TString RequestId_;
};
@@ -341,7 +361,7 @@ private:
private:
TCoreRequestContext Context_;
NHttp::IResponsePtr Response_;
- std::unique_ptr<IInputStream> Stream_;
+ std::unique_ptr<IAbortableInputStream> Stream_;
};
class TCoreHttpRequest
diff --git a/yt/cpp/mapreduce/http/http_client.h b/yt/cpp/mapreduce/http/http_client.h
index d66e3e7c675..5ae57a200a7 100644
--- a/yt/cpp/mapreduce/http/http_client.h
+++ b/yt/cpp/mapreduce/http/http_client.h
@@ -2,6 +2,7 @@
#include "fwd.h"
+#include <yt/cpp/mapreduce/interface/abortable_stream.h>
#include <yt/cpp/mapreduce/interface/io.h>
#include <util/datetime/base.h>
@@ -28,7 +29,7 @@ public:
virtual ~IHttpResponse() = default;
virtual int GetStatusCode() = 0;
- virtual IInputStream* GetResponseStream() = 0;
+ virtual IAbortableInputStream* GetResponseStream() = 0;
virtual TString GetResponse() = 0;
virtual TString GetRequestId() const = 0;
};
@@ -74,6 +75,16 @@ public:
Underlying_ = Response_->GetResponseStream();
}
+ void Abort() override
+ {
+ Underlying_->Abort();
+ }
+
+ bool IsAborted() const override
+ {
+ return Underlying_->IsAborted();
+ }
+
private:
size_t DoRead(void *buf, size_t len) override
{
@@ -87,7 +98,7 @@ private:
private:
IHttpResponsePtr Response_;
- IInputStream* Underlying_;
+ IAbortableInputStream* Underlying_;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/http_client/raw_client.cpp b/yt/cpp/mapreduce/http_client/raw_client.cpp
index 4ccafb1a987..557e4afdbd4 100644
--- a/yt/cpp/mapreduce/http_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/http_client/raw_client.cpp
@@ -632,7 +632,7 @@ IFileReaderPtr THttpRawClient::GetJobTrace(
return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
}
-std::unique_ptr<IInputStream> THttpRawClient::ReadFile(
+std::unique_ptr<IAbortableInputStream> THttpRawClient::ReadFile(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFileReaderOptions& options)
@@ -827,7 +827,7 @@ std::unique_ptr<IOutputStream> THttpRawClient::WriteTable(
return NRawClient::WriteTable(Context_, transactionId, path, format, options);
}
-std::unique_ptr<IInputStream> THttpRawClient::ReadTable(
+std::unique_ptr<IAbortableInputStream> THttpRawClient::ReadTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFormat& format,
@@ -854,7 +854,7 @@ std::unique_ptr<IOutputStream> THttpRawClient::WriteFile(
return NRawClient::WriteFile(Context_, transactionId, path, options);
}
-std::unique_ptr<IInputStream> THttpRawClient::ReadTablePartition(
+std::unique_ptr<IAbortableInputStream> THttpRawClient::ReadTablePartition(
const TString& cookie,
const TFormat& format,
const TTablePartitionReaderOptions& options)
@@ -872,7 +872,7 @@ std::unique_ptr<IInputStream> THttpRawClient::ReadTablePartition(
return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
}
-std::unique_ptr<IInputStream> THttpRawClient::ReadBlobTable(
+std::unique_ptr<IAbortableInputStream> THttpRawClient::ReadBlobTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TKey& key,
diff --git a/yt/cpp/mapreduce/http_client/raw_client.h b/yt/cpp/mapreduce/http_client/raw_client.h
index f1e9c780789..d2a503605ff 100644
--- a/yt/cpp/mapreduce/http_client/raw_client.h
+++ b/yt/cpp/mapreduce/http_client/raw_client.h
@@ -210,7 +210,7 @@ public:
// Files
- std::unique_ptr<IInputStream> ReadFile(
+ std::unique_ptr<IAbortableInputStream> ReadFile(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFileReaderOptions& options = {}) override;
@@ -290,18 +290,18 @@ public:
const TMaybe<TFormat>& format,
const TTableWriterOptions& options = {}) override;
- std::unique_ptr<IInputStream> ReadTable(
+ std::unique_ptr<IAbortableInputStream> ReadTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFormat& format,
const TTableReaderOptions& options = {}) override;
- std::unique_ptr<IInputStream> ReadTablePartition(
+ std::unique_ptr<IAbortableInputStream> ReadTablePartition(
const TString& cookie,
const TFormat& format,
const TTablePartitionReaderOptions& options = {}) override;
- std::unique_ptr<IInputStream> ReadBlobTable(
+ std::unique_ptr<IAbortableInputStream> ReadBlobTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TKey& key,
diff --git a/yt/cpp/mapreduce/interface/abortable_stream.cpp b/yt/cpp/mapreduce/interface/abortable_stream.cpp
new file mode 100644
index 00000000000..fc3b8d01f5b
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/abortable_stream.cpp
@@ -0,0 +1,34 @@
+#include "abortable_stream.h"
+
+#include <yt/cpp/mapreduce/interface/errors.h>
+
+#include <util/system/yassert.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+void IAbortableInputStream::Abort()
+{
+ Y_ABORT("Unimplemented");
+}
+
+bool IAbortableInputStream::IsAborted() const
+{
+ return false;
+}
+
+bool IAbortableInputStream::IsAbortedError(const std::exception_ptr& error)
+{
+ try {
+ std::rethrow_exception(error);
+ } catch (const TInputStreamAbortedError& ex) {
+ return true;
+ } catch (...) {
+ return false;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/interface/abortable_stream.h b/yt/cpp/mapreduce/interface/abortable_stream.h
new file mode 100644
index 00000000000..40316e1b790
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/abortable_stream.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include <util/stream/input.h>
+
+#include <exception>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+/// @brief Input stream that supports aborting the read operation.
+///
+/// Extends @ref IInputStream with the ability to immediately cancel current or future reads.
+class IAbortableInputStream
+ : public IInputStream
+{
+public:
+ ~IAbortableInputStream() override = default;
+
+ /// @brief Immediately abort the stream, cancelling current and future reads.
+ ///
+ /// Some clients have already implemented this interface, so using pure virtual method leads to build errors.
+ virtual void Abort();
+
+ /// @brief Check whether the stream has been aborted.
+ virtual bool IsAborted() const;
+
+ static bool IsAbortedError(const std::exception_ptr& error);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/interface/config.cpp b/yt/cpp/mapreduce/interface/config.cpp
index 9379ab57473..b4edcee4370 100644
--- a/yt/cpp/mapreduce/interface/config.cpp
+++ b/yt/cpp/mapreduce/interface/config.cpp
@@ -457,6 +457,8 @@ void Serialize(const TConfig& config, NYson::IYsonConsumer* consumer)
.Item("api_file_path_options").Value(config.ApiFilePathOptions)
.Item("use_abortable_response").Value(config.UseAbortableResponse)
.Item("enable_debug_metrics").Value(config.EnableDebugMetrics)
+ .Item("use_halting_response").Value(config.UseHaltingResponse)
+ .Item("halting_response_bytes_limit").Value(config.HaltingResponseBytesLimit)
.Item("enable_local_mode_optimization").Value(config.EnableLocalModeOptimization)
.Item("write_stderr_successful_jobs").Value(config.WriteStderrSuccessfulJobs)
.Item("trace_http_requests_mode").Value(::ToString(config.TraceHttpRequestsMode))
@@ -533,6 +535,8 @@ void Deserialize(TConfig& config, const TNode& node)
DESERIALIZE_ITEM("api_file_path_options", config.ApiFilePathOptions);
DESERIALIZE_ITEM("use_abortable_response", config.UseAbortableResponse);
DESERIALIZE_ITEM("enable_debug_metrics", config.EnableDebugMetrics);
+ DESERIALIZE_ITEM("use_halting_response", config.UseHaltingResponse);
+ DESERIALIZE_ITEM("halting_response_bytes_limit", config.HaltingResponseBytesLimit);
DESERIALIZE_ITEM("enable_local_mode_optimization", config.EnableLocalModeOptimization);
DESERIALIZE_ITEM("write_stderr_successful_jobs", config.WriteStderrSuccessfulJobs);
DESERIALIZE_ITEM("trace_http_requests_mode", config.TraceHttpRequestsMode);
diff --git a/yt/cpp/mapreduce/interface/config.h b/yt/cpp/mapreduce/interface/config.h
index 7bf3112c620..b0d77548956 100644
--- a/yt/cpp/mapreduce/interface/config.h
+++ b/yt/cpp/mapreduce/interface/config.h
@@ -239,6 +239,18 @@ struct TConfig
bool UseAbortableResponse = false;
bool EnableDebugMetrics = false;
+ /// @brief Simulate a response that halts (stops sending data) mid-stream.
+ ///
+ /// Testing options, should never be used in user programs.
+ /// When enabled, the HTTP response will be truncated after @ref HaltingResponseBytesLimit bytes.
+ bool UseHaltingResponse = false;
+
+ /// @brief Maximum number of bytes sent before the response is halted.
+ ///
+ /// Testing options, should never be used in user programs.
+ /// Only meaningful when @ref UseHaltingResponse is true.
+ i64 HaltingResponseBytesLimit = 64 * 1024;
+
//
// There is optimization used with local YT that enables to skip binary upload and use real binary path.
// When EnableLocalModeOptimization is set to false this optimization is completely disabled.
diff --git a/yt/cpp/mapreduce/interface/errors.h b/yt/cpp/mapreduce/interface/errors.h
index 6f78ffe14f3..cb8a7b5af90 100644
--- a/yt/cpp/mapreduce/interface/errors.h
+++ b/yt/cpp/mapreduce/interface/errors.h
@@ -293,6 +293,13 @@ private:
TVector<TFailedJobInfo> FailedJobInfo_;
};
+///
+/// @brief Error that is thrown when trying to read from aborted reader.
+///
+class TInputStreamAbortedError
+ : public yexception
+{ };
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
diff --git a/yt/cpp/mapreduce/interface/io-inl.h b/yt/cpp/mapreduce/interface/io-inl.h
index 77078bb5eb5..ac1464f5b98 100644
--- a/yt/cpp/mapreduce/interface/io-inl.h
+++ b/yt/cpp/mapreduce/interface/io-inl.h
@@ -123,6 +123,8 @@ struct IReaderImplBase
virtual i64 GetTabletIndex() const;
virtual bool IsEndOfStream() const;
virtual bool IsRawReaderExhausted() const;
+ virtual void Abort();
+ virtual bool IsAborted() const;
};
struct INodeReaderImpl
@@ -223,6 +225,16 @@ public:
return Reader_->GetTabletIndex();
}
+ void Abort()
+ {
+ Reader_->Abort();
+ }
+
+ bool IsAborted() const
+ {
+ return Reader_->IsAborted();
+ }
+
protected:
template <typename TCacher, typename TCacheGetter>
const auto& DoGetRowCached(TCacher cacher, TCacheGetter cacheGetter) const
diff --git a/yt/cpp/mapreduce/interface/io.cpp b/yt/cpp/mapreduce/interface/io.cpp
index bbd417893c4..d3b4621c235 100644
--- a/yt/cpp/mapreduce/interface/io.cpp
+++ b/yt/cpp/mapreduce/interface/io.cpp
@@ -28,6 +28,16 @@ bool IReaderImplBase::IsRawReaderExhausted() const
Y_ABORT("Unimplemented");
}
+void IReaderImplBase::Abort()
+{
+ Y_ABORT("Unimplemented");
+}
+
+bool IReaderImplBase::IsAborted() const
+{
+ return false;
+}
+
////////////////////////////////////////////////////////////////////////////////
namespace NDetail {
diff --git a/yt/cpp/mapreduce/interface/io.h b/yt/cpp/mapreduce/interface/io.h
index f8faf09663a..c80da6b041c 100644
--- a/yt/cpp/mapreduce/interface/io.h
+++ b/yt/cpp/mapreduce/interface/io.h
@@ -8,6 +8,7 @@
#include "fwd.h"
+#include "abortable_stream.h"
#include "client_method_options.h"
#include "common.h"
#include "distributed_session.h"
@@ -107,7 +108,7 @@ class TIOException
/// Interface representing YT file reader.
class IFileReader
: public TThrRefBase
- , public IInputStream
+ , public IAbortableInputStream
{ };
/// Interface representing YT file writer.
@@ -135,7 +136,7 @@ public:
/// Low-level interface to read YT table with retries.
class TRawTableReader
: public TThrRefBase
- , public IInputStream
+ , public IAbortableInputStream
{
public:
/// @brief Retry table read starting from the specified `rangeIndex` and `rowIndex`.
@@ -279,6 +280,12 @@ public:
/// Returns `true` if job raw input stream was closed and `false` otherwise.
bool IsRawReaderExhausted() const;
+
+ /// @brief Abort the reader, interrupting any in-progress or future reads.
+ void Abort();
+
+ /// @brief Check whether the reader is aborted.
+ bool IsAborted() const;
};
/// @brief Iterator for use in range-based-for.
diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h
index e318c022ce5..9347a95e6e6 100644
--- a/yt/cpp/mapreduce/interface/raw_client.h
+++ b/yt/cpp/mapreduce/interface/raw_client.h
@@ -4,6 +4,8 @@
#include "client_method_options.h"
#include "operation.h"
+#include <yt/cpp/mapreduce/interface/abortable_stream.h>
+
#include <yt/cpp/mapreduce/http/context.h>
namespace NYT {
@@ -213,7 +215,7 @@ public:
// Files
- virtual std::unique_ptr<IInputStream> ReadFile(
+ virtual std::unique_ptr<IAbortableInputStream> ReadFile(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFileReaderOptions& options = {}) = 0;
@@ -299,18 +301,18 @@ public:
const TMaybe<TFormat>& format,
const TTableWriterOptions& options = {}) = 0;
- virtual std::unique_ptr<IInputStream> ReadTable(
+ virtual std::unique_ptr<IAbortableInputStream> ReadTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFormat& format,
const TTableReaderOptions& options = {}) = 0;
- virtual std::unique_ptr<IInputStream> ReadTablePartition(
+ virtual std::unique_ptr<IAbortableInputStream> ReadTablePartition(
const TString& cookie,
const TFormat& format,
const TTablePartitionReaderOptions& options = {}) = 0;
- virtual std::unique_ptr<IInputStream> ReadBlobTable(
+ virtual std::unique_ptr<IAbortableInputStream> ReadBlobTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TKey& key,
diff --git a/yt/cpp/mapreduce/interface/ya.make b/yt/cpp/mapreduce/interface/ya.make
index caa68b20f69..5e56861fa86 100644
--- a/yt/cpp/mapreduce/interface/ya.make
+++ b/yt/cpp/mapreduce/interface/ya.make
@@ -3,6 +3,7 @@ LIBRARY()
INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
SRCS(
+ abortable_stream.cpp
batch_request.cpp
client.cpp
client_method_options.cpp
diff --git a/yt/cpp/mapreduce/io/counting_raw_reader.cpp b/yt/cpp/mapreduce/io/counting_raw_reader.cpp
index c6213e86651..05f3b1cca6b 100644
--- a/yt/cpp/mapreduce/io/counting_raw_reader.cpp
+++ b/yt/cpp/mapreduce/io/counting_raw_reader.cpp
@@ -28,6 +28,16 @@ size_t TCountingRawTableReader::GetReadByteCount() const
return ReadByteCount_;
}
+void TCountingRawTableReader::Abort()
+{
+ Reader_->Abort();
+}
+
+bool TCountingRawTableReader::IsAborted() const
+{
+ return Reader_->IsAborted();
+}
+
size_t TCountingRawTableReader::DoRead(void* buf, size_t len)
{
auto readLen = Reader_->Read(buf, len);
diff --git a/yt/cpp/mapreduce/io/counting_raw_reader.h b/yt/cpp/mapreduce/io/counting_raw_reader.h
index c3b197d5844..fe78144d76a 100644
--- a/yt/cpp/mapreduce/io/counting_raw_reader.h
+++ b/yt/cpp/mapreduce/io/counting_raw_reader.h
@@ -22,6 +22,9 @@ public:
size_t GetReadByteCount() const;
+ void Abort() override;
+ bool IsAborted() const override;
+
protected:
size_t DoRead(void* buf, size_t len) override;
diff --git a/yt/cpp/mapreduce/io/lenval_table_reader.cpp b/yt/cpp/mapreduce/io/lenval_table_reader.cpp
index d7a1c9754ee..676400b1df3 100644
--- a/yt/cpp/mapreduce/io/lenval_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/lenval_table_reader.cpp
@@ -2,6 +2,7 @@
#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/interface/errors.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <util/string/printf.h>
@@ -30,6 +31,9 @@ TLenvalTableReader::~TLenvalTableReader()
void TLenvalTableReader::CheckValidity() const
{
+ if (Input_.IsAborted()) {
+ ythrow TInputStreamAbortedError() << "Stream was aborted";
+ }
if (!IsValid()) {
ythrow yexception() << "Iterator is not valid";
}
diff --git a/yt/cpp/mapreduce/io/node_table_reader.cpp b/yt/cpp/mapreduce/io/node_table_reader.cpp
index 558c42b30ee..01eaf5d9462 100644
--- a/yt/cpp/mapreduce/io/node_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/node_table_reader.cpp
@@ -3,6 +3,7 @@
#include <yt/cpp/mapreduce/common/node_builder.h>
#include <yt/cpp/mapreduce/common/wait_proxy.h>
+#include <yt/cpp/mapreduce/interface/errors.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <library/cpp/yson/parser.h>
@@ -341,6 +342,16 @@ bool TNodeTableReader::IsRawReaderExhausted() const
return Finished_;
}
+void TNodeTableReader::Abort()
+{
+ Input_.Abort();
+}
+
+bool TNodeTableReader::IsAborted() const
+{
+ return Input_.IsAborted();
+}
+
////////////////////////////////////////////////////////////////////////////////
void TNodeTableReader::PrepareParsing()
@@ -368,6 +379,9 @@ void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error
void TNodeTableReader::CheckValidity() const
{
+ if (IsAborted()) {
+ ythrow TInputStreamAbortedError() << "Stream was aborted";
+ }
if (!Valid_) {
ythrow yexception() << "Iterator is not valid";
}
diff --git a/yt/cpp/mapreduce/io/node_table_reader.h b/yt/cpp/mapreduce/io/node_table_reader.h
index c8e319ce4f6..92af5b111b2 100644
--- a/yt/cpp/mapreduce/io/node_table_reader.h
+++ b/yt/cpp/mapreduce/io/node_table_reader.h
@@ -11,8 +11,6 @@
#include <util/system/event.h>
#include <util/system/thread.h>
-#include <atomic>
-
namespace NYT {
class TRawTableReader;
@@ -54,6 +52,8 @@ public:
TMaybe<size_t> GetReadByteCount() const override;
bool IsEndOfStream() const override;
bool IsRawReaderExhausted() const override;
+ void Abort() override;
+ bool IsAborted() const override;
private:
void NextImpl();
diff --git a/yt/cpp/mapreduce/io/proto_table_reader.cpp b/yt/cpp/mapreduce/io/proto_table_reader.cpp
index 6f79619c810..76e0243b395 100644
--- a/yt/cpp/mapreduce/io/proto_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/proto_table_reader.cpp
@@ -206,6 +206,16 @@ bool TProtoTableReader::IsRawReaderExhausted() const
return NodeReader_->IsRawReaderExhausted();
}
+void TProtoTableReader::Abort()
+{
+ NodeReader_->Abort();
+}
+
+bool TProtoTableReader::IsAborted() const
+{
+ return NodeReader_->IsAborted();
+}
+
////////////////////////////////////////////////////////////////////////////////
TLenvalProtoTableReader::TLenvalProtoTableReader(
@@ -291,6 +301,16 @@ bool TLenvalProtoTableReader::IsRawReaderExhausted() const
return TLenvalTableReader::IsRawReaderExhausted();
}
+void TLenvalProtoTableReader::Abort()
+{
+ Input_.Abort();
+}
+
+bool TLenvalProtoTableReader::IsAborted() const
+{
+ return Input_.IsAborted();
+}
+
void TLenvalProtoTableReader::SkipRow()
{
while (true) {
diff --git a/yt/cpp/mapreduce/io/proto_table_reader.h b/yt/cpp/mapreduce/io/proto_table_reader.h
index bfe4ac56474..fdb1f2fd5f5 100644
--- a/yt/cpp/mapreduce/io/proto_table_reader.h
+++ b/yt/cpp/mapreduce/io/proto_table_reader.h
@@ -31,6 +31,8 @@ public:
TMaybe<size_t> GetReadByteCount() const override;
bool IsEndOfStream() const override;
bool IsRawReaderExhausted() const override;
+ void Abort() override;
+ bool IsAborted() const override;
private:
std::unique_ptr<TNodeTableReader> NodeReader_;
@@ -64,6 +66,8 @@ public:
TMaybe<size_t> GetReadByteCount() const override;
bool IsEndOfStream() const override;
bool IsRawReaderExhausted() const override;
+ void Abort() override;
+ bool IsAborted() const override;
protected:
void SkipRow() override;
diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp
index f77f2d71307..6de1d222d53 100644
--- a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp
@@ -1,5 +1,6 @@
#include "skiff_row_table_reader.h"
+#include <yt/cpp/mapreduce/interface/errors.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <yt/cpp/mapreduce/interface/skiff_row.h>
@@ -100,6 +101,9 @@ void TSkiffRowTableReader::SkipRow()
}
void TSkiffRowTableReader::CheckValidity() const {
+ if (IsAborted()) {
+ ythrow TInputStreamAbortedError() << "Stream was aborted";
+ }
if (!IsValid()) {
ythrow yexception() << "Iterator is not valid";
}
@@ -230,6 +234,16 @@ bool TSkiffRowTableReader::IsRawReaderExhausted() const {
return Finished_;
}
+void TSkiffRowTableReader::Abort()
+{
+ Input_.Abort();
+}
+
+bool TSkiffRowTableReader::IsAborted() const
+{
+ return Input_.IsAborted();
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.h b/yt/cpp/mapreduce/io/skiff_row_table_reader.h
index 1f623570bb9..c8086b48b83 100644
--- a/yt/cpp/mapreduce/io/skiff_row_table_reader.h
+++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.h
@@ -37,6 +37,8 @@ public:
TMaybe<size_t> GetReadByteCount() const override;
bool IsEndOfStream() const override;
bool IsRawReaderExhausted() const override;
+ void Abort() override;
+ bool IsAborted() const override;
private:
bool Retry(const std::exception_ptr& error);
diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_table_reader.cpp
index c96adade41e..94662741b45 100644
--- a/yt/cpp/mapreduce/io/skiff_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/skiff_table_reader.cpp
@@ -1,5 +1,6 @@
#include "skiff_table_reader.h"
+#include <yt/cpp/mapreduce/interface/errors.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <library/cpp/yson/node/node_io.h>
@@ -155,6 +156,16 @@ bool TSkiffTableReader::IsRawReaderExhausted() const
return Finished_;
}
+void TSkiffTableReader::Abort()
+{
+ Input_.Abort();
+}
+
+bool TSkiffTableReader::IsAborted() const
+{
+ return Input_.IsAborted();
+}
+
////////////////////////////////////////////////////////////////////////////////
TVector<TSkiffTableReader::TSkiffTableSchema> TSkiffTableReader::CreateSkiffTableSchemas(
@@ -288,6 +299,9 @@ void TSkiffTableReader::ReadRow()
void TSkiffTableReader::EnsureValidity() const
{
+ if (IsAborted()) {
+ ythrow TInputStreamAbortedError() << "Stream was aborted";
+ }
Y_ENSURE(Valid_, "Iterator is not valid");
}
diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.h b/yt/cpp/mapreduce/io/skiff_table_reader.h
index c7614776f41..ee01332beaf 100644
--- a/yt/cpp/mapreduce/io/skiff_table_reader.h
+++ b/yt/cpp/mapreduce/io/skiff_table_reader.h
@@ -35,6 +35,9 @@ public:
TMaybe<size_t> GetReadByteCount() const override;
bool IsRawReaderExhausted() const override;
+ void Abort() override;
+ bool IsAborted() const override;
+
private:
struct TSkiffTableSchema;
diff --git a/yt/cpp/mapreduce/io/stream_raw_reader.cpp b/yt/cpp/mapreduce/io/stream_raw_reader.cpp
index ec19b67d0b0..f69eb9f034f 100644
--- a/yt/cpp/mapreduce/io/stream_raw_reader.cpp
+++ b/yt/cpp/mapreduce/io/stream_raw_reader.cpp
@@ -5,6 +5,10 @@
#include "skiff_table_reader.h"
#include "yamr_table_reader.h"
+#include <yt/cpp/mapreduce/common/abortable_stream.h>
+
+#include <yt/yt/core/concurrency/async_stream_helpers.h>
+
#include <util/system/env.h>
#include <util/string/type.h>
@@ -33,6 +37,12 @@ namespace NDetail {
////////////////////////////////////////////////////////////////////////////////
+TInputStreamProxy::TInputStreamProxy(IInputStream* stream)
+ : Stream_(CreateAbortableInputStreamAdapter(NConcurrency::CreateAsyncAdapter(stream)))
+{ }
+
+////////////////////////////////////////////////////////////////////////////////
+
::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
IInputStream* stream,
const TTableReaderOptions& /* options */,
diff --git a/yt/cpp/mapreduce/io/stream_table_reader.h b/yt/cpp/mapreduce/io/stream_table_reader.h
index a7694a0a709..826d0fc269a 100644
--- a/yt/cpp/mapreduce/io/stream_table_reader.h
+++ b/yt/cpp/mapreduce/io/stream_table_reader.h
@@ -1,5 +1,6 @@
#pragma once
+#include <yt/cpp/mapreduce/interface/abortable_stream.h>
#include <yt/cpp/mapreduce/interface/io.h>
namespace NYT {
@@ -11,9 +12,7 @@ class TInputStreamProxy
: public TRawTableReader
{
public:
- TInputStreamProxy(IInputStream* stream)
- : Stream_(stream)
- { }
+ explicit TInputStreamProxy(IInputStream* stream);
bool Retry(
const TMaybe<ui32>& /*rangeIndex*/,
@@ -31,6 +30,16 @@ public:
return false;
}
+ void Abort() override
+ {
+ Stream_->Abort();
+ }
+
+ bool IsAborted() const override
+ {
+ return Stream_->IsAborted();
+ }
+
protected:
size_t DoRead(void* buf, size_t len) override
{
@@ -38,7 +47,7 @@ protected:
}
private:
- IInputStream* Stream_;
+ std::unique_ptr<IAbortableInputStream> Stream_;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/io/yamr_table_reader.cpp b/yt/cpp/mapreduce/io/yamr_table_reader.cpp
index 3ae6b8a8aeb..e9ebab520cc 100644
--- a/yt/cpp/mapreduce/io/yamr_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/yamr_table_reader.cpp
@@ -78,6 +78,16 @@ bool TYaMRTableReader::IsRawReaderExhausted() const
return TLenvalTableReader::IsRawReaderExhausted();
}
+void TYaMRTableReader::Abort()
+{
+ Input_.Abort();
+}
+
+bool TYaMRTableReader::IsAborted() const
+{
+ return Input_.IsAborted();
+}
+
void TYaMRTableReader::ReadField(TString* result, i32 length)
{
result->resize(length);
diff --git a/yt/cpp/mapreduce/io/yamr_table_reader.h b/yt/cpp/mapreduce/io/yamr_table_reader.h
index 39fdecfa71a..a2a6b97ebcb 100644
--- a/yt/cpp/mapreduce/io/yamr_table_reader.h
+++ b/yt/cpp/mapreduce/io/yamr_table_reader.h
@@ -30,6 +30,8 @@ public:
TMaybe<size_t> GetReadByteCount() const override;
bool IsEndOfStream() const override;
bool IsRawReaderExhausted() const override;
+ void Abort() override;
+ bool IsAborted() const override;
private:
void ReadField(TString* result, i32 length);
diff --git a/yt/cpp/mapreduce/rpc_client/raw_client.cpp b/yt/cpp/mapreduce/rpc_client/raw_client.cpp
index 530fd58cb02..f5643b1f6a5 100644
--- a/yt/cpp/mapreduce/rpc_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/rpc_client/raw_client.cpp
@@ -5,8 +5,12 @@
#include "rpc_parameters_serialization.h"
#include "wrap_rpc_error.h"
+#include <yt/cpp/mapreduce/common/abortable_stream.h>
+#include <yt/cpp/mapreduce/common/halting_stream.h>
#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/interface/abortable_stream.h>
+#include <yt/cpp/mapreduce/interface/config.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <yt/yt/client/api/file_reader.h>
@@ -205,15 +209,25 @@ void WriteInChunks(const void* buf, size_t len, const TWriteFn& writeFn)
////////////////////////////////////////////////////////////////////////////////
class TSyncRpcInputStream
- : public IInputStream
+ : public IAbortableInputStream
{
public:
- explicit TSyncRpcInputStream(std::unique_ptr<IInputStream> stream)
+ explicit TSyncRpcInputStream(std::unique_ptr<IAbortableInputStream> stream)
: Underlying_(std::move(stream))
{ }
+ void Abort() override
+ {
+ Underlying_->Abort();
+ }
+
+ bool IsAborted() const override
+ {
+ return Underlying_->IsAborted();
+ }
+
private:
- const std::unique_ptr<IInputStream> Underlying_;
+ const std::unique_ptr<IAbortableInputStream> Underlying_;
size_t DoRead(void* buf, size_t len) override
{
@@ -861,10 +875,15 @@ class TRpcResponseStream
: public IFileReader
{
public:
- TRpcResponseStream(std::unique_ptr<IInputStream> stream)
+ TRpcResponseStream(std::unique_ptr<IAbortableInputStream> stream)
: Underlying_(std::move(stream))
{ }
+ void Abort() override
+ {
+ Underlying_->Abort();
+ }
+
private:
size_t DoRead(void *buf, size_t len) override
{
@@ -877,11 +896,11 @@ private:
}
private:
- std::unique_ptr<IInputStream> Underlying_;
+ std::unique_ptr<IAbortableInputStream> Underlying_;
};
class TFixedStringStream
- : public IInputStream
+ : public IAbortableInputStream
{
public:
TFixedStringStream(TSharedRef data)
@@ -915,7 +934,7 @@ IFileReaderPtr TRpcRawClient::GetJobInput(
{
auto future = Clients_.Heavy->GetJobInput(NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId)));
auto result = WaitAndProcess(future);
- auto stream = std::make_unique<TSyncRpcInputStream>(CreateSyncAdapter(CreateCopyingAdapter(result)));
+ auto stream = std::make_unique<TSyncRpcInputStream>(CreateAbortableInputStreamAdapter(CreateCopyingAdapter(result)));
return MakeIntrusive<TRpcResponseStream>(std::move(stream));
}
@@ -928,7 +947,7 @@ IFileReaderPtr TRpcRawClient::GetJobFailContext(
NScheduler::TOperationId(YtGuidFromUtilGuid(operationId)),
NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId)));
auto result = WaitAndProcess(future);
- std::unique_ptr<IInputStream> stream(new TFixedStringStream(std::move(result)));
+ std::unique_ptr<IAbortableInputStream> stream(new TFixedStringStream(std::move(result)));
return MakeIntrusive<TRpcResponseStream>(std::move(stream));
}
@@ -941,7 +960,7 @@ IFileReaderPtr TRpcRawClient::GetJobStderr(
NScheduler::TOperationId(YtGuidFromUtilGuid(operationId)),
NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId)));
auto result = WaitAndProcess(future);
- std::unique_ptr<IInputStream> stream(new TFixedStringStream(std::move(result.Data)));
+ std::unique_ptr<IAbortableInputStream> stream(new TFixedStringStream(std::move(result.Data)));
return MakeIntrusive<TRpcResponseStream>(std::move(stream));
}
@@ -955,19 +974,19 @@ IFileReaderPtr TRpcRawClient::GetJobTrace(
NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId)),
SerializeOptionsForGetJobTrace(options));
auto result = WaitAndProcess(future);
- auto stream = CreateSyncAdapter(CreateCopyingAdapter(result));
+ auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(result));
return MakeIntrusive<TRpcResponseStream>(std::move(stream));
}
-std::unique_ptr<IInputStream> TRpcRawClient::ReadFile(
+std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadFile(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFileReaderOptions& options)
{
auto future = Clients_.Heavy->CreateFileReader(path.Path_, SerializeOptionsForReadFile(transactionId, options));
auto reader = WaitAndProcess(future);
- auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(reader));
- return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter));
+ auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(reader));
+ return std::make_unique<TSyncRpcInputStream>(std::move(stream));
}
class TRpcWriteFileRequestStream
@@ -1245,7 +1264,7 @@ std::unique_ptr<IOutputStream> TRpcRawClient::WriteTable(
return std::make_unique<TSyncRpcOutputStream>(std::move(rowStream));
}
-std::unique_ptr<IInputStream> TRpcRawClient::ReadTable(
+std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFormat& format,
@@ -1259,11 +1278,15 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadTable(
auto formatStream = WaitAndProcess(future);
- auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(std::move(formatStream)));
- return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter));
+ auto asyncStream = CreateCopyingAdapter(std::move(formatStream));
+ if (TConfig::Get()->UseHaltingResponse) {
+ asyncStream = NYT::NDetail::CreateHaltingAsyncStream(std::move(asyncStream), TConfig::Get()->HaltingResponseBytesLimit);
+ }
+ auto stream = CreateAbortableInputStreamAdapter(std::move(asyncStream));
+ return std::make_unique<TSyncRpcInputStream>(std::move(stream));
}
-std::unique_ptr<IInputStream> TRpcRawClient::ReadTablePartition(
+std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadTablePartition(
const TString& cookie,
const TFormat& format,
const TTablePartitionReaderOptions& options)
@@ -1276,11 +1299,11 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadTablePartition(
auto formatStream = WaitAndProcess(future);
- auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(std::move(formatStream)));
+ auto syncAdapter = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(std::move(formatStream)));
return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter));
}
-std::unique_ptr<IInputStream> TRpcRawClient::ReadBlobTable(
+std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadBlobTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TKey& key,
@@ -1331,8 +1354,8 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadBlobTable(
options.StartPartIndex_,
options.Offset_,
options.PartSize_);
- auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(blobReader));
- return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter));
+ auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(blobReader));
+ return std::make_unique<TSyncRpcInputStream>(std::move(stream));
}
void TRpcRawClient::AlterTableReplica(
diff --git a/yt/cpp/mapreduce/rpc_client/raw_client.h b/yt/cpp/mapreduce/rpc_client/raw_client.h
index 07fbb19758c..0eea19df078 100644
--- a/yt/cpp/mapreduce/rpc_client/raw_client.h
+++ b/yt/cpp/mapreduce/rpc_client/raw_client.h
@@ -211,7 +211,7 @@ public:
// Files
- std::unique_ptr<IInputStream> ReadFile(
+ std::unique_ptr<IAbortableInputStream> ReadFile(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFileReaderOptions& options = {}) override;
@@ -291,18 +291,18 @@ public:
const TMaybe<TFormat>& format,
const TTableWriterOptions& options = {}) override;
- std::unique_ptr<IInputStream> ReadTable(
+ std::unique_ptr<IAbortableInputStream> ReadTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFormat& format,
const TTableReaderOptions& options = {}) override;
- std::unique_ptr<IInputStream> ReadTablePartition(
+ std::unique_ptr<IAbortableInputStream> ReadTablePartition(
const TString& cookie,
const TFormat& format,
const TTablePartitionReaderOptions& options = {}) override;
- std::unique_ptr<IInputStream> ReadBlobTable(
+ std::unique_ptr<IAbortableInputStream> ReadBlobTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TKey& key,
diff --git a/yt/cpp/mapreduce/rpc_client/ya.make b/yt/cpp/mapreduce/rpc_client/ya.make
index 115dc550ff7..37fd90543a8 100644
--- a/yt/cpp/mapreduce/rpc_client/ya.make
+++ b/yt/cpp/mapreduce/rpc_client/ya.make
@@ -12,6 +12,7 @@ SRCS(
PEERDIR(
library/cpp/yson/node
+ yt/cpp/mapreduce/common
yt/cpp/mapreduce/interface
yt/yt/client
)
diff --git a/yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h b/yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h
index 46215ed5d0a..58263637652 100644
--- a/yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h
+++ b/yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h
@@ -45,6 +45,8 @@ void WriteProtoTable(const IClientBasePtr& client, const TString& tablePath, con
bool UseRpcClient();
+bool UseDefaultHttpClient();
+
////////////////////////////////////////////////////////////////////////////////
// TODO: should be removed, usages should be replaced with TConfigSaverGuard
@@ -218,6 +220,9 @@ void Out<NYT::NTesting::TOwningYaMRRow>(IOutputStream& out, const NYT::NTesting:
#define SKIP_IF_RPC() \
SKIP_TEST_IF(UseRpcClient(), "Unsupported test for RPC Client")
+#define SKIP_IF_DEFAULT_HTTP() \
+ SKIP_TEST_IF(UseDefaultHttpClient(), "Unsupported test for default (sync) HTTP Client")
+
#define YT_UNITTEST_LIB_H_
#include "yt_unittest_lib-inl.h"
#undef YT_UNITTEST_LIB_H_