diff options
| author | hiddenpath <[email protected]> | 2024-12-23 15:18:22 +0300 |
|---|---|---|
| committer | hiddenpath <[email protected]> | 2024-12-23 16:12:22 +0300 |
| commit | fac5ce91c704b4a1d3aa2f059842df3ba3a955bd (patch) | |
| tree | 00fa7a628e7aa63df387c095cf8a2c7cbac3e3e9 /yt/cpp/mapreduce/client/file_reader.cpp | |
| parent | fbc08848ae794f01d5107e6b556045e62fd36afe (diff) | |
YT-23616: Move read_blob_table and read_file to THttpRawClient
commit_hash:6c9209d019fa324c9ae4f182b18e7d089a32937d
Diffstat (limited to 'yt/cpp/mapreduce/client/file_reader.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/file_reader.cpp | 172 |
1 files changed, 31 insertions, 141 deletions
diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp index 06463d0af24..f88b40e38b6 100644 --- a/yt/cpp/mapreduce/client/file_reader.cpp +++ b/yt/cpp/mapreduce/client/file_reader.cpp @@ -31,7 +31,7 @@ using ::ToString; static TMaybe<ui64> GetEndOffset(const TFileReaderOptions& options) { if (options.Length_) { - return options.Offset_.GetOrElse(0) + *options.Length_; + return options.Offset_ + *options.Length_; } else { return Nothing(); } @@ -46,7 +46,6 @@ TStreamReaderBase::TStreamReaderBase( const TClientContext& context, const TTransactionId& transactionId) : RawClient_(rawClient) - , Context_(context) , ClientRetryPolicy_(std::move(clientRetryPolicy)) , ReadTransaction_(MakeHolder<TPingableTransaction>( RawClient_, @@ -64,59 +63,26 @@ TYPath TStreamReaderBase::Snapshot(const TYPath& path) return NYT::Snapshot(RawClient_, ClientRetryPolicy_, ReadTransaction_->GetId(), path); } -TString TStreamReaderBase::GetActiveRequestId() const -{ - if (Response_) { - return Response_->GetRequestId();; - } else { - return "<no-active-request>"; - } -} - size_t TStreamReaderBase::DoRead(void* buf, size_t len) { - const int retryCount = Context_.Config->ReadRetryCount; - for (int attempt = 1; attempt <= retryCount; ++attempt) { - try { - if (!Input_) { - Response_ = Request(Context_, ReadTransaction_->GetId(), CurrentOffset_); - Input_ = Response_->GetResponseStream(); - } - if (len == 0) { - return 0; - } - const size_t read = Input_->Read(buf, len); - CurrentOffset_ += read; - return read; - } catch (TErrorResponse& e) { - YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)", - GetActiveRequestId(), - e.what(), - attempt, - retryCount); - - if (!IsRetriable(e) || attempt == retryCount) { - throw; - } - TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config)); - } catch (std::exception& e) { - YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)", - GetActiveRequestId(), - e.what(), - attempt, - retryCount); - - // Invalidate connection. - Response_.reset(); - - if (attempt == retryCount) { + if (len == 0) { + return 0; + } + return RequestWithRetry<size_t>( + ClientRetryPolicy_->CreatePolicyForReaderRequest(), + [this, &buf, len] (TMutationId /*mutationId*/) { + try { + if (!Input_) { + Input_ = Request(ReadTransaction_->GetId(), CurrentOffset_); + } + const size_t read = Input_->Read(buf, len); + CurrentOffset_ += read; + return read; + } catch (...) { + Input_ = nullptr; throw; } - TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config)); - } - Input_ = nullptr; - } - Y_UNREACHABLE(); // we should either return or throw from loop above + }); } //////////////////////////////////////////////////////////////////////////////// @@ -130,57 +96,25 @@ TFileReader::TFileReader( const TTransactionId& transactionId, const TFileReaderOptions& options) : TStreamReaderBase(rawClient, std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId) - , FileReaderOptions_(options) + , StartOffset_(options.Offset_) + , EndOffset_(GetEndOffset(options)) + , Options_(options) , Path_(path) - , StartOffset_(FileReaderOptions_.Offset_.GetOrElse(0)) - , EndOffset_(GetEndOffset(FileReaderOptions_)) { Path_.Path_ = TStreamReaderBase::Snapshot(Path_.Path_); } -NHttpClient::IHttpResponsePtr TFileReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) +std::unique_ptr<IInputStream> TFileReader::Request(const TTransactionId& transactionId, ui64 readBytes) { const ui64 currentOffset = StartOffset_ + readBytes; - TString hostName = GetProxyForHeavyRequest(context); - - THttpHeader header("GET", GetReadFileCommand(context.Config->ApiVersion)); - if (context.ServiceTicketAuth) { - header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket()); - } else { - header.SetToken(context.Token); - } - - if (context.ImpersonationUser) { - header.SetImpersonationUser(*context.ImpersonationUser); - } - - UpdateHeaderForProxyIfNeed(hostName, context, header); - - header.AddTransactionId(transactionId); - header.SetOutputFormat(TMaybe<TFormat>()); // Binary format if (EndOffset_) { Y_ABORT_UNLESS(*EndOffset_ >= currentOffset); - FileReaderOptions_.Length(*EndOffset_ - currentOffset); - } - FileReaderOptions_.Offset(currentOffset); - header.MergeParameters(FormIORequestParameters(Path_, FileReaderOptions_)); - - header.SetResponseCompression(ToString(context.Config->AcceptEncoding)); - - auto requestId = CreateGuidAsString(); - NHttpClient::IHttpResponsePtr response; - try { - response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header); - } catch (const std::exception& ex) { - LogRequestError(requestId, header, ex.what(), ""); - throw; + Options_.Length(*EndOffset_ - currentOffset); } - YT_LOG_DEBUG("RSP %v - file stream", - requestId); - - return response; + Options_.Offset(currentOffset); + return RawClient_->ReadFile(transactionId, Path_, Options_); } //////////////////////////////////////////////////////////////////////////////// @@ -195,66 +129,22 @@ TBlobTableReader::TBlobTableReader( const TTransactionId& transactionId, const TBlobTableReaderOptions& options) : TStreamReaderBase(rawClient, std::move(retryPolicy), std::move(transactionPinger), context, transactionId) + , StartOffset_(options.Offset_) , Key_(key) , Options_(options) { Path_ = TStreamReaderBase::Snapshot(path); } -NHttpClient::IHttpResponsePtr TBlobTableReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) +std::unique_ptr<IInputStream> TBlobTableReader::Request(const TTransactionId& transactionId, ui64 readBytes) { - TString hostName = GetProxyForHeavyRequest(context); - - THttpHeader header("GET", "read_blob_table"); - if (context.ServiceTicketAuth) { - header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket()); - } else { - header.SetToken(context.Token); - } - - if (context.ImpersonationUser) { - header.SetImpersonationUser(*context.ImpersonationUser); - } - - UpdateHeaderForProxyIfNeed(hostName, context, header); - - header.AddTransactionId(transactionId); - header.SetOutputFormat(TMaybe<TFormat>()); // Binary format - - const ui64 currentOffset = Options_.Offset_ + readBytes; + const i64 currentOffset = StartOffset_ + readBytes; const i64 startPartIndex = currentOffset / Options_.PartSize_; - const ui64 skipBytes = currentOffset - Options_.PartSize_ * startPartIndex; - auto lowerLimitKey = Key_; - lowerLimitKey.Parts_.push_back(startPartIndex); - auto upperLimitKey = Key_; - upperLimitKey.Parts_.push_back(std::numeric_limits<i64>::max()); - TNode params = PathToParamNode(TRichYPath(Path_).AddRange(TReadRange() - .LowerLimit(TReadLimit().Key(lowerLimitKey)) - .UpperLimit(TReadLimit().Key(upperLimitKey)))); - params["start_part_index"] = TNode(startPartIndex); - params["offset"] = skipBytes; - if (Options_.PartIndexColumnName_) { - params["part_index_column_name"] = *Options_.PartIndexColumnName_; - } - if (Options_.DataColumnName_) { - params["data_column_name"] = *Options_.DataColumnName_; - } - params["part_size"] = Options_.PartSize_; - header.MergeParameters(params); - header.SetResponseCompression(ToString(context.Config->AcceptEncoding)); - - auto requestId = CreateGuidAsString(); - NHttpClient::IHttpResponsePtr response; - try { - response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header); - } catch (const std::exception& ex) { - LogRequestError(requestId, header, ex.what(), ""); - throw; - } + const i64 skipBytes = currentOffset - Options_.PartSize_ * startPartIndex; - YT_LOG_DEBUG("RSP %v - blob table stream", - requestId); - return response; + Options_.Offset(skipBytes); + Options_.StartPartIndex(startPartIndex); + return RawClient_->ReadBlobTable(transactionId, Path_, Key_, Options_); } //////////////////////////////////////////////////////////////////////////////// |
