summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/client_reader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'yt/cpp/mapreduce/client/client_reader.cpp')
-rw-r--r--yt/cpp/mapreduce/client/client_reader.cpp37
1 files changed, 34 insertions, 3 deletions
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp
index 0bbc6fa1322..f60bf8b8547 100644
--- a/yt/cpp/mapreduce/client/client_reader.cpp
+++ b/yt/cpp/mapreduce/client/client_reader.cpp
@@ -76,7 +76,6 @@ TClientReader::TClientReader(
}
TransformYPath();
- CreateRequest();
}
bool TClientReader::Retry(
@@ -131,16 +130,25 @@ void TClientReader::ResetRetries()
void TClientReader::Abort()
{
- Input_->Abort();
+ auto g = Guard(Lock_);
+ AbortRequested_ = true;
+ if (Input_) {
+ Input_->Abort();
+ }
}
bool TClientReader::IsAborted() const
{
+ auto g = Guard(Lock_);
+ if (!Input_) {
+ return AbortRequested_;
+ }
return Input_->IsAborted();
}
size_t TClientReader::DoRead(void* buf, size_t len)
{
+ EnsureInitialized();
return Input_->Read(buf, len);
}
@@ -193,11 +201,34 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u
ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex));
}
- Input_ = NDetail::RequestWithRetry<std::unique_ptr<IAbortableInputStream>>(
+ auto newInput = NDetail::RequestWithRetry<std::unique_ptr<IAbortableInputStream>>(
CurrentRequestRetryPolicy_,
[this, &transactionId] (TMutationId /*mutationId*/) {
return RawClient_->ReadTable(transactionId, Path_, Format_, Options_);
});
+
+ auto g = Guard(Lock_);
+ // NB: Abort could've been called while we are waiting for newInput
+ if (AbortRequested_) {
+ newInput->Abort();
+ }
+
+ Input_ = std::move(newInput);
+}
+
+void TClientReader::EnsureInitialized()
+{
+ if (Input_) {
+ return;
+ }
+
+ {
+ auto g = Guard(Lock_);
+ if (AbortRequested_) {
+ ythrow TInputStreamAbortedError() << "Stream was aborted";
+ }
+ }
+ CreateRequest();
}
////////////////////////////////////////////////////////////////////////////////