path: root/yt/cpp/mapreduce/client/file_reader.cpp
diff options
authormax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
committermax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
commit73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch)
tree188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/client/file_reader.cpp
parent528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff)
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build. Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/client/file_reader.cpp')
1 files changed, 243 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp
new file mode 100644
index 0000000000..fc21e0bc02
--- /dev/null
+++ b/yt/cpp/mapreduce/client/file_reader.cpp
@@ -0,0 +1,243 @@
+#include "file_reader.h"
+#include "transaction.h"
+#include "transaction_pinger.h"
+#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/common/wait_proxy.h>
+#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/tvm.h>
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+#include <yt/cpp/mapreduce/io/helpers.h>
+#include <yt/cpp/mapreduce/http/helpers.h>
+#include <yt/cpp/mapreduce/http/http.h>
+#include <yt/cpp/mapreduce/http/http_client.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
+namespace NYT {
+namespace NDetail {
+using ::ToString;
+static TMaybe<ui64> GetEndOffset(const TFileReaderOptions& options) {
+ if (options.Length_) {
+ return options.Offset_.GetOrElse(0) + *options.Length_;
+ } else {
+ return Nothing();
+ }
+ IClientRetryPolicyPtr clientRetryPolicy,
+ ITransactionPingerPtr transactionPinger,
+ const TClientContext& context,
+ const TTransactionId& transactionId)
+ : Context_(context)
+ , ClientRetryPolicy_(std::move(clientRetryPolicy))
+ , ReadTransaction_(MakeHolder<TPingableTransaction>(
+ ClientRetryPolicy_,
+ context,
+ transactionId,
+ transactionPinger->GetChildTxPinger(),
+ TStartTransactionOptions()))
+{ }
+TStreamReaderBase::~TStreamReaderBase() = default;
+TYPath TStreamReaderBase::Snapshot(const TYPath& path)
+ return NYT::Snapshot(ClientRetryPolicy_, Context_, ReadTransaction_->GetId(), path);
+TString TStreamReaderBase::GetActiveRequestId() const
+ if (Response_) {
+ return Response_->GetRequestId();;
+ } else {
+ return "<no-active-request>";
+ }
+size_t TStreamReaderBase::DoRead(void* buf, size_t len)
+ const int retryCount = Context_.Config->ReadRetryCount;
+ for (int attempt = 1; attempt <= retryCount; ++attempt) {
+ try {
+ if (!Input_) {
+ Response_ = Request(Context_, ReadTransaction_->GetId(), CurrentOffset_);
+ Input_ = Response_->GetResponseStream();
+ }
+ if (len == 0) {
+ return 0;
+ }
+ const size_t read = Input_->Read(buf, len);
+ CurrentOffset_ += read;
+ return read;
+ } catch (TErrorResponse& e) {
+ YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)",
+ GetActiveRequestId(),
+ e.what(),
+ attempt,
+ retryCount);
+ if (!IsRetriable(e) || attempt == retryCount) {
+ throw;
+ }
+ NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config));
+ } catch (std::exception& e) {
+ YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)",
+ GetActiveRequestId(),
+ e.what(),
+ attempt,
+ retryCount);
+ // Invalidate connection.
+ Response_.reset();
+ if (attempt == retryCount) {
+ throw;
+ }
+ NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config));
+ }
+ Input_ = nullptr;
+ }
+ Y_UNREACHABLE(); // we should either return or throw from loop above
+ const TRichYPath& path,
+ IClientRetryPolicyPtr clientRetryPolicy,
+ ITransactionPingerPtr transactionPinger,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TFileReaderOptions& options)
+ : TStreamReaderBase(std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId)
+ , FileReaderOptions_(options)
+ , Path_(path)
+ , StartOffset_(FileReaderOptions_.Offset_.GetOrElse(0))
+ , EndOffset_(GetEndOffset(FileReaderOptions_))
+ Path_.Path_ = TStreamReaderBase::Snapshot(Path_.Path_);
+NHttpClient::IHttpResponsePtr TFileReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes)
+ const ui64 currentOffset = StartOffset_ + readBytes;
+ TString hostName = GetProxyForHeavyRequest(context);
+ THttpHeader header("GET", GetReadFileCommand(context.Config->ApiVersion));
+ if (context.ServiceTicketAuth) {
+ header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
+ } else {
+ header.SetToken(context.Token);
+ }
+ header.AddTransactionId(transactionId);
+ header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
+ if (EndOffset_) {
+ Y_VERIFY(*EndOffset_ >= currentOffset);
+ FileReaderOptions_.Length(*EndOffset_ - currentOffset);
+ }
+ FileReaderOptions_.Offset(currentOffset);
+ header.MergeParameters(FormIORequestParameters(Path_, FileReaderOptions_));
+ header.SetResponseCompression(ToString(context.Config->AcceptEncoding));
+ auto requestId = CreateGuidAsString();
+ NHttpClient::IHttpResponsePtr response;
+ try {
+ response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
+ } catch (const std::exception& ex) {
+ LogRequestError(requestId, header, ex.what(), "");
+ throw;
+ }
+ YT_LOG_DEBUG("RSP %v - file stream",
+ requestId);
+ return response;
+ const TYPath& path,
+ const TKey& key,
+ IClientRetryPolicyPtr retryPolicy,
+ ITransactionPingerPtr transactionPinger,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TBlobTableReaderOptions& options)
+ : TStreamReaderBase(std::move(retryPolicy), std::move(transactionPinger), context, transactionId)
+ , Key_(key)
+ , Options_(options)
+ Path_ = TStreamReaderBase::Snapshot(path);
+NHttpClient::IHttpResponsePtr TBlobTableReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes)
+ TString hostName = GetProxyForHeavyRequest(context);
+ THttpHeader header("GET", "read_blob_table");
+ if (context.ServiceTicketAuth) {
+ header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
+ } else {
+ header.SetToken(context.Token);
+ }
+ header.AddTransactionId(transactionId);
+ header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
+ const ui64 currentOffset = Options_.Offset_ + readBytes;
+ const i64 startPartIndex = currentOffset / Options_.PartSize_;
+ const ui64 skipBytes = currentOffset - Options_.PartSize_ * startPartIndex;
+ auto lowerLimitKey = Key_;
+ lowerLimitKey.Parts_.push_back(startPartIndex);
+ auto upperLimitKey = Key_;
+ upperLimitKey.Parts_.push_back(std::numeric_limits<i64>::max());
+ TNode params = PathToParamNode(TRichYPath(Path_).AddRange(TReadRange()
+ .LowerLimit(TReadLimit().Key(lowerLimitKey))
+ .UpperLimit(TReadLimit().Key(upperLimitKey))));
+ params["start_part_index"] = TNode(startPartIndex);
+ params["offset"] = skipBytes;
+ if (Options_.PartIndexColumnName_) {
+ params["part_index_column_name"] = *Options_.PartIndexColumnName_;
+ }
+ if (Options_.DataColumnName_) {
+ params["data_column_name"] = *Options_.DataColumnName_;
+ }
+ params["part_size"] = Options_.PartSize_;
+ header.MergeParameters(params);
+ header.SetResponseCompression(ToString(context.Config->AcceptEncoding));
+ auto requestId = CreateGuidAsString();
+ NHttpClient::IHttpResponsePtr response;
+ try {
+ response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
+ } catch (const std::exception& ex) {
+ LogRequestError(requestId, header, ex.what(), "");
+ throw;
+ }
+ YT_LOG_DEBUG("RSP %v - blob table stream",
+ requestId);
+ return response;
+} // namespace NDetail
+} // namespace NYT