From 52e702a66e34e2a53e005bec3e22b5d198c09139 Mon Sep 17 00:00:00 2001 From: achains Date: Fri, 24 Apr 2026 14:29:22 +0300 Subject: YT-26179: lazy read request commit_hash:a3758715df7ff97a0a471492dd907f949744e4d7 --- yt/cpp/mapreduce/client/client_reader.cpp | 37 ++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) (limited to 'yt/cpp/mapreduce/client/client_reader.cpp') diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp index 0bbc6fa1322..f60bf8b8547 100644 --- a/yt/cpp/mapreduce/client/client_reader.cpp +++ b/yt/cpp/mapreduce/client/client_reader.cpp @@ -76,7 +76,6 @@ TClientReader::TClientReader( } TransformYPath(); - CreateRequest(); } bool TClientReader::Retry( @@ -131,16 +130,25 @@ void TClientReader::ResetRetries() void TClientReader::Abort() { - Input_->Abort(); + auto g = Guard(Lock_); + AbortRequested_ = true; + if (Input_) { + Input_->Abort(); + } } bool TClientReader::IsAborted() const { + auto g = Guard(Lock_); + if (!Input_) { + return AbortRequested_; + } return Input_->IsAborted(); } size_t TClientReader::DoRead(void* buf, size_t len) { + EnsureInitialized(); return Input_->Read(buf, len); } @@ -193,11 +201,34 @@ void TClientReader::CreateRequest(const TMaybe& rangeIndex, const TMaybebegin()->LowerLimit(TReadLimit().RowIndex(*rowIndex)); } - Input_ = NDetail::RequestWithRetry>( + auto newInput = NDetail::RequestWithRetry>( CurrentRequestRetryPolicy_, [this, &transactionId] (TMutationId /*mutationId*/) { return RawClient_->ReadTable(transactionId, Path_, Format_, Options_); }); + + auto g = Guard(Lock_); + // NB: Abort could've been called while we are waiting for newInput + if (AbortRequested_) { + newInput->Abort(); + } + + Input_ = std::move(newInput); +} + +void TClientReader::EnsureInitialized() +{ + if (Input_) { + return; + } + + { + auto g = Guard(Lock_); + if (AbortRequested_) { + ythrow TInputStreamAbortedError() << "Stream was aborted"; + } + } + CreateRequest(); } //////////////////////////////////////////////////////////////////////////////// -- cgit v1.3