From 3e1899838408bbad47622007aa382bc8a2b01f87 Mon Sep 17 00:00:00 2001 From: max42 Date: Fri, 30 Jun 2023 11:13:34 +0300 Subject: Revert "YT-19324: move YT provider to ydb/library/yql" This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12. --- yt/cpp/mapreduce/client/client_reader.cpp | 232 ------------------------------ 1 file changed, 232 deletions(-) delete mode 100644 yt/cpp/mapreduce/client/client_reader.cpp (limited to 'yt/cpp/mapreduce/client/client_reader.cpp') diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp deleted file mode 100644 index 80759b12dcc..00000000000 --- a/yt/cpp/mapreduce/client/client_reader.cpp +++ /dev/null @@ -1,232 +0,0 @@ -#include "client_reader.h" - -#include "structured_table_formats.h" -#include "transaction.h" -#include "transaction_pinger.h" - -#include -#include -#include - -#include -#include - -#include - -#include -#include - -#include -#include -#include - -#include - -#include - -#include -#include -#include -#include -#include - -namespace NYT { - -using ::ToString; - -//////////////////////////////////////////////////////////////////////////////// - -TClientReader::TClientReader( - const TRichYPath& path, - IClientRetryPolicyPtr clientRetryPolicy, - ITransactionPingerPtr transactionPinger, - const TClientContext& context, - const TTransactionId& transactionId, - const TFormat& format, - const TTableReaderOptions& options, - bool useFormatFromTableAttributes) - : Path_(path) - , ClientRetryPolicy_(std::move(clientRetryPolicy)) - , Context_(context) - , ParentTransactionId_(transactionId) - , Format_(format) - , Options_(options) - , ReadTransaction_(nullptr) -{ - if (options.CreateTransaction_) { - Y_VERIFY(transactionPinger, "Internal error: transactionPinger is null"); - ReadTransaction_ = MakeHolder( - ClientRetryPolicy_, - Context_, - transactionId, - transactionPinger->GetChildTxPinger(), - TStartTransactionOptions()); - Path_.Path(Snapshot( - ClientRetryPolicy_, - Context_, - ReadTransaction_->GetId(), - path.Path_)); - } - - if (useFormatFromTableAttributes) { - auto transactionId2 = ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_; - auto newFormat = GetTableFormat(ClientRetryPolicy_, Context_, transactionId2, Path_); - if (newFormat) { - Format_->Config = *newFormat; - } - } - - TransformYPath(); - CreateRequest(); -} - -bool TClientReader::Retry( - const TMaybe& rangeIndex, - const TMaybe& rowIndex) -{ - if (CurrentRequestRetryPolicy_) { - // TODO we should pass actual exception in Retry function - yexception genericError; - auto backoff = CurrentRequestRetryPolicy_->OnGenericError(genericError); - if (!backoff) { - return false; - } - } - - try { - CreateRequest(rangeIndex, rowIndex); - return true; - } catch (const std::exception& ex) { - YT_LOG_ERROR("Client reader retry failed: %v", - ex.what()); - - return false; - } -} - -void TClientReader::ResetRetries() -{ - CurrentRequestRetryPolicy_ = nullptr; -} - -size_t TClientReader::DoRead(void* buf, size_t len) -{ - return Input_->Read(buf, len); -} - -void TClientReader::TransformYPath() -{ - for (auto& range : Path_.MutableRangesView()) { - auto& exact = range.Exact_; - if (IsTrivial(exact)) { - continue; - } - - if (exact.RowIndex_) { - range.LowerLimit(TReadLimit().RowIndex(*exact.RowIndex_)); - range.UpperLimit(TReadLimit().RowIndex(*exact.RowIndex_ + 1)); - exact.RowIndex_.Clear(); - - } else if (exact.Key_) { - range.LowerLimit(TReadLimit().Key(*exact.Key_)); - - auto lastPart = TNode::CreateEntity(); - lastPart.Attributes() = TNode()("type", "max"); - exact.Key_->Parts_.push_back(lastPart); - - range.UpperLimit(TReadLimit().Key(*exact.Key_)); - exact.Key_.Clear(); - } - } -} - -void TClientReader::CreateRequest(const TMaybe& rangeIndex, const TMaybe& rowIndex) -{ - if (!CurrentRequestRetryPolicy_) { - CurrentRequestRetryPolicy_ = ClientRetryPolicy_->CreatePolicyForGenericRequest(); - } - while (true) { - CurrentRequestRetryPolicy_->NotifyNewAttempt(); - - THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion)); - if (Context_.ServiceTicketAuth) { - header.SetServiceTicket(Context_.ServiceTicketAuth->Ptr->IssueServiceTicket()); - } else { - header.SetToken(Context_.Token); - } - auto transactionId = (ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_); - header.AddTransactionId(transactionId); - - const auto& controlAttributes = Options_.ControlAttributes_; - header.AddParameter("control_attributes", TNode() - ("enable_row_index", controlAttributes.EnableRowIndex_) - ("enable_range_index", controlAttributes.EnableRangeIndex_)); - header.SetOutputFormat(Format_); - - header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding)); - - if (rowIndex.Defined()) { - auto& ranges = Path_.MutableRanges(); - if (ranges.Empty()) { - ranges.ConstructInPlace(TVector{TReadRange()}); - } else { - if (rangeIndex.GetOrElse(0) >= ranges->size()) { - ythrow yexception() - << "range index " << rangeIndex.GetOrElse(0) - << " is out of range, input range count is " << ranges->size(); - } - ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0)); - } - ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex)); - } - - header.MergeParameters(FormIORequestParameters(Path_, Options_)); - - auto requestId = CreateGuidAsString(); - - try { - const auto proxyName = GetProxyForHeavyRequest(Context_); - Response_ = Context_.HttpClient->Request(GetFullUrl(proxyName, Context_, header), requestId, header); - - Input_ = Response_->GetResponseStream(); - - YT_LOG_DEBUG("RSP %v - table stream", requestId); - - return; - } catch (const TErrorResponse& e) { - LogRequestError( - requestId, - header, - e.what(), - CurrentRequestRetryPolicy_->GetAttemptDescription()); - - if (!IsRetriable(e)) { - throw; - } - auto backoff = CurrentRequestRetryPolicy_->OnRetriableError(e); - if (!backoff) { - throw; - } - NDetail::TWaitProxy::Get()->Sleep(*backoff); - } catch (const std::exception& e) { - LogRequestError( - requestId, - header, - e.what(), - CurrentRequestRetryPolicy_->GetAttemptDescription()); - - Response_.reset(); - Input_ = nullptr; - - auto backoff = CurrentRequestRetryPolicy_->OnGenericError(e); - if (!backoff) { - throw; - } - NDetail::TWaitProxy::Get()->Sleep(*backoff); - } - } -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT -- cgit v1.3