diff options
Diffstat (limited to 'yt/cpp/mapreduce/client/client_reader.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/client_reader.cpp | 37 |
1 files changed, 34 insertions, 3 deletions
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp index 0bbc6fa1322..f60bf8b8547 100644 --- a/yt/cpp/mapreduce/client/client_reader.cpp +++ b/yt/cpp/mapreduce/client/client_reader.cpp @@ -76,7 +76,6 @@ TClientReader::TClientReader( } TransformYPath(); - CreateRequest(); } bool TClientReader::Retry( @@ -131,16 +130,25 @@ void TClientReader::ResetRetries() void TClientReader::Abort() { - Input_->Abort(); + auto g = Guard(Lock_); + AbortRequested_ = true; + if (Input_) { + Input_->Abort(); + } } bool TClientReader::IsAborted() const { + auto g = Guard(Lock_); + if (!Input_) { + return AbortRequested_; + } return Input_->IsAborted(); } size_t TClientReader::DoRead(void* buf, size_t len) { + EnsureInitialized(); return Input_->Read(buf, len); } @@ -193,11 +201,34 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex)); } - Input_ = NDetail::RequestWithRetry<std::unique_ptr<IAbortableInputStream>>( + auto newInput = NDetail::RequestWithRetry<std::unique_ptr<IAbortableInputStream>>( CurrentRequestRetryPolicy_, [this, &transactionId] (TMutationId /*mutationId*/) { return RawClient_->ReadTable(transactionId, Path_, Format_, Options_); }); + + auto g = Guard(Lock_); + // NB: Abort could've been called while we are waiting for newInput + if (AbortRequested_) { + newInput->Abort(); + } + + Input_ = std::move(newInput); +} + +void TClientReader::EnsureInitialized() +{ + if (Input_) { + return; + } + + { + auto g = Guard(Lock_); + if (AbortRequested_) { + ythrow TInputStreamAbortedError() << "Stream was aborted"; + } + } + CreateRequest(); } //////////////////////////////////////////////////////////////////////////////// |
