diff options
| author | max42 <[email protected]> | 2023-06-30 03:37:03 +0300 |
|---|---|---|
| committer | max42 <[email protected]> | 2023-06-30 03:37:03 +0300 |
| commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
| tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/client/file_reader.cpp | |
| parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/client/file_reader.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/file_reader.cpp | 243 |
1 files changed, 243 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp new file mode 100644 index 00000000000..fc21e0bc02d --- /dev/null +++ b/yt/cpp/mapreduce/client/file_reader.cpp @@ -0,0 +1,243 @@ +#include "file_reader.h" + +#include "transaction.h" +#include "transaction_pinger.h" + +#include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/common/wait_proxy.h> + +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/tvm.h> + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <yt/cpp/mapreduce/io/helpers.h> + +#include <yt/cpp/mapreduce/http/helpers.h> +#include <yt/cpp/mapreduce/http/http.h> +#include <yt/cpp/mapreduce/http/http_client.h> +#include <yt/cpp/mapreduce/http/retry_request.h> + +#include <yt/cpp/mapreduce/raw_client/raw_requests.h> + +namespace NYT { +namespace NDetail { + +using ::ToString; + +//////////////////////////////////////////////////////////////////////////////// + +static TMaybe<ui64> GetEndOffset(const TFileReaderOptions& options) { + if (options.Length_) { + return options.Offset_.GetOrElse(0) + *options.Length_; + } else { + return Nothing(); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +TStreamReaderBase::TStreamReaderBase( + IClientRetryPolicyPtr clientRetryPolicy, + ITransactionPingerPtr transactionPinger, + const TClientContext& context, + const TTransactionId& transactionId) + : Context_(context) + , ClientRetryPolicy_(std::move(clientRetryPolicy)) + , ReadTransaction_(MakeHolder<TPingableTransaction>( + ClientRetryPolicy_, + context, + transactionId, + transactionPinger->GetChildTxPinger(), + TStartTransactionOptions())) +{ } + +TStreamReaderBase::~TStreamReaderBase() = default; + +TYPath TStreamReaderBase::Snapshot(const TYPath& path) +{ + return NYT::Snapshot(ClientRetryPolicy_, Context_, ReadTransaction_->GetId(), path); +} + +TString TStreamReaderBase::GetActiveRequestId() const +{ + if (Response_) { + return Response_->GetRequestId();; + } else { + return "<no-active-request>"; + } +} + +size_t TStreamReaderBase::DoRead(void* buf, size_t len) +{ + const int retryCount = Context_.Config->ReadRetryCount; + for (int attempt = 1; attempt <= retryCount; ++attempt) { + try { + if (!Input_) { + Response_ = Request(Context_, ReadTransaction_->GetId(), CurrentOffset_); + Input_ = Response_->GetResponseStream(); + } + if (len == 0) { + return 0; + } + const size_t read = Input_->Read(buf, len); + CurrentOffset_ += read; + return read; + } catch (TErrorResponse& e) { + YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)", + GetActiveRequestId(), + e.what(), + attempt, + retryCount); + + if (!IsRetriable(e) || attempt == retryCount) { + throw; + } + NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config)); + } catch (std::exception& e) { + YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)", + GetActiveRequestId(), + e.what(), + attempt, + retryCount); + + // Invalidate connection. + Response_.reset(); + + if (attempt == retryCount) { + throw; + } + NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config)); + } + Input_ = nullptr; + } + Y_UNREACHABLE(); // we should either return or throw from loop above +} + +//////////////////////////////////////////////////////////////////////////////// + +TFileReader::TFileReader( + const TRichYPath& path, + IClientRetryPolicyPtr clientRetryPolicy, + ITransactionPingerPtr transactionPinger, + const TClientContext& context, + const TTransactionId& transactionId, + const TFileReaderOptions& options) + : TStreamReaderBase(std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId) + , FileReaderOptions_(options) + , Path_(path) + , StartOffset_(FileReaderOptions_.Offset_.GetOrElse(0)) + , EndOffset_(GetEndOffset(FileReaderOptions_)) +{ + Path_.Path_ = TStreamReaderBase::Snapshot(Path_.Path_); +} + +NHttpClient::IHttpResponsePtr TFileReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) +{ + const ui64 currentOffset = StartOffset_ + readBytes; + TString hostName = GetProxyForHeavyRequest(context); + + THttpHeader header("GET", GetReadFileCommand(context.Config->ApiVersion)); + if (context.ServiceTicketAuth) { + header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket()); + } else { + header.SetToken(context.Token); + } + header.AddTransactionId(transactionId); + header.SetOutputFormat(TMaybe<TFormat>()); // Binary format + + if (EndOffset_) { + Y_VERIFY(*EndOffset_ >= currentOffset); + FileReaderOptions_.Length(*EndOffset_ - currentOffset); + } + FileReaderOptions_.Offset(currentOffset); + header.MergeParameters(FormIORequestParameters(Path_, FileReaderOptions_)); + + header.SetResponseCompression(ToString(context.Config->AcceptEncoding)); + + auto requestId = CreateGuidAsString(); + NHttpClient::IHttpResponsePtr response; + try { + response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header); + } catch (const std::exception& ex) { + LogRequestError(requestId, header, ex.what(), ""); + throw; + } + + YT_LOG_DEBUG("RSP %v - file stream", + requestId); + + return response; +} + +//////////////////////////////////////////////////////////////////////////////// + +TBlobTableReader::TBlobTableReader( + const TYPath& path, + const TKey& key, + IClientRetryPolicyPtr retryPolicy, + ITransactionPingerPtr transactionPinger, + const TClientContext& context, + const TTransactionId& transactionId, + const TBlobTableReaderOptions& options) + : TStreamReaderBase(std::move(retryPolicy), std::move(transactionPinger), context, transactionId) + , Key_(key) + , Options_(options) +{ + Path_ = TStreamReaderBase::Snapshot(path); +} + +NHttpClient::IHttpResponsePtr TBlobTableReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) +{ + TString hostName = GetProxyForHeavyRequest(context); + + THttpHeader header("GET", "read_blob_table"); + if (context.ServiceTicketAuth) { + header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket()); + } else { + header.SetToken(context.Token); + } + header.AddTransactionId(transactionId); + header.SetOutputFormat(TMaybe<TFormat>()); // Binary format + + const ui64 currentOffset = Options_.Offset_ + readBytes; + const i64 startPartIndex = currentOffset / Options_.PartSize_; + const ui64 skipBytes = currentOffset - Options_.PartSize_ * startPartIndex; + auto lowerLimitKey = Key_; + lowerLimitKey.Parts_.push_back(startPartIndex); + auto upperLimitKey = Key_; + upperLimitKey.Parts_.push_back(std::numeric_limits<i64>::max()); + TNode params = PathToParamNode(TRichYPath(Path_).AddRange(TReadRange() + .LowerLimit(TReadLimit().Key(lowerLimitKey)) + .UpperLimit(TReadLimit().Key(upperLimitKey)))); + params["start_part_index"] = TNode(startPartIndex); + params["offset"] = skipBytes; + if (Options_.PartIndexColumnName_) { + params["part_index_column_name"] = *Options_.PartIndexColumnName_; + } + if (Options_.DataColumnName_) { + params["data_column_name"] = *Options_.DataColumnName_; + } + params["part_size"] = Options_.PartSize_; + header.MergeParameters(params); + header.SetResponseCompression(ToString(context.Config->AcceptEncoding)); + + auto requestId = CreateGuidAsString(); + NHttpClient::IHttpResponsePtr response; + try { + response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header); + } catch (const std::exception& ex) { + LogRequestError(requestId, header, ex.what(), ""); + throw; + } + + YT_LOG_DEBUG("RSP %v - blob table stream", + requestId); + return response; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail +} // namespace NYT |
