aboutsummaryrefslogblamecommitdiffstats
path: root/yt/cpp/mapreduce/client/file_reader.cpp
blob: fc21e0bc02d746a7efdac61b3c0f61d21e743e78 (plain) (tree)

















































































































































































































































                                                                                                                                           
#include "file_reader.h"

#include "transaction.h"
#include "transaction_pinger.h"

#include <yt/cpp/mapreduce/common/helpers.h>
#include <yt/cpp/mapreduce/common/retry_lib.h>
#include <yt/cpp/mapreduce/common/wait_proxy.h>

#include <yt/cpp/mapreduce/interface/config.h>
#include <yt/cpp/mapreduce/interface/tvm.h>

#include <yt/cpp/mapreduce/interface/logging/yt_log.h>

#include <yt/cpp/mapreduce/io/helpers.h>

#include <yt/cpp/mapreduce/http/helpers.h>
#include <yt/cpp/mapreduce/http/http.h>
#include <yt/cpp/mapreduce/http/http_client.h>
#include <yt/cpp/mapreduce/http/retry_request.h>

#include <yt/cpp/mapreduce/raw_client/raw_requests.h>

namespace NYT {
namespace NDetail {

using ::ToString;

////////////////////////////////////////////////////////////////////////////////

static TMaybe<ui64> GetEndOffset(const TFileReaderOptions& options) {
    if (options.Length_) {
        return options.Offset_.GetOrElse(0) + *options.Length_;
    } else {
        return Nothing();
    }
}

////////////////////////////////////////////////////////////////////////////////

TStreamReaderBase::TStreamReaderBase(
    IClientRetryPolicyPtr clientRetryPolicy,
    ITransactionPingerPtr transactionPinger,
    const TClientContext& context,
    const TTransactionId& transactionId)
    : Context_(context)
    , ClientRetryPolicy_(std::move(clientRetryPolicy))
    , ReadTransaction_(MakeHolder<TPingableTransaction>(
        ClientRetryPolicy_,
        context,
        transactionId,
        transactionPinger->GetChildTxPinger(),
        TStartTransactionOptions()))
{ }

TStreamReaderBase::~TStreamReaderBase() = default;

TYPath TStreamReaderBase::Snapshot(const TYPath& path)
{
    return NYT::Snapshot(ClientRetryPolicy_, Context_, ReadTransaction_->GetId(), path);
}

TString TStreamReaderBase::GetActiveRequestId() const
{
    if (Response_) {
        return Response_->GetRequestId();;
    } else {
        return "<no-active-request>";
    }
}

size_t TStreamReaderBase::DoRead(void* buf, size_t len)
{
    const int retryCount = Context_.Config->ReadRetryCount;
    for (int attempt = 1; attempt <= retryCount; ++attempt) {
        try {
            if (!Input_) {
                Response_ = Request(Context_, ReadTransaction_->GetId(), CurrentOffset_);
                Input_ = Response_->GetResponseStream();
            }
            if (len == 0) {
                return 0;
            }
            const size_t read = Input_->Read(buf, len);
            CurrentOffset_ += read;
            return read;
        } catch (TErrorResponse& e) {
            YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)",
                GetActiveRequestId(),
                e.what(),
                attempt,
                retryCount);

            if (!IsRetriable(e) || attempt == retryCount) {
                throw;
            }
            NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config));
        } catch (std::exception& e) {
            YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)",
                GetActiveRequestId(),
                e.what(),
                attempt,
                retryCount);

            // Invalidate connection.
            Response_.reset();

            if (attempt == retryCount) {
                throw;
            }
            NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config));
        }
        Input_ = nullptr;
    }
    Y_UNREACHABLE(); // we should either return or throw from loop above
}

////////////////////////////////////////////////////////////////////////////////

TFileReader::TFileReader(
    const TRichYPath& path,
    IClientRetryPolicyPtr clientRetryPolicy,
    ITransactionPingerPtr transactionPinger,
    const TClientContext& context,
    const TTransactionId& transactionId,
    const TFileReaderOptions& options)
    : TStreamReaderBase(std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId)
    , FileReaderOptions_(options)
    , Path_(path)
    , StartOffset_(FileReaderOptions_.Offset_.GetOrElse(0))
    , EndOffset_(GetEndOffset(FileReaderOptions_))
{
    Path_.Path_ = TStreamReaderBase::Snapshot(Path_.Path_);
}

NHttpClient::IHttpResponsePtr TFileReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes)
{
    const ui64 currentOffset = StartOffset_ + readBytes;
    TString hostName = GetProxyForHeavyRequest(context);

    THttpHeader header("GET", GetReadFileCommand(context.Config->ApiVersion));
    if (context.ServiceTicketAuth) {
        header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
    } else {
        header.SetToken(context.Token);
    }
    header.AddTransactionId(transactionId);
    header.SetOutputFormat(TMaybe<TFormat>()); // Binary format

    if (EndOffset_) {
        Y_VERIFY(*EndOffset_ >= currentOffset);
        FileReaderOptions_.Length(*EndOffset_ - currentOffset);
    }
    FileReaderOptions_.Offset(currentOffset);
    header.MergeParameters(FormIORequestParameters(Path_, FileReaderOptions_));

    header.SetResponseCompression(ToString(context.Config->AcceptEncoding));

    auto requestId = CreateGuidAsString();
    NHttpClient::IHttpResponsePtr response;
    try {
        response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
    } catch (const std::exception& ex) {
        LogRequestError(requestId, header, ex.what(), "");
        throw;
    }

    YT_LOG_DEBUG("RSP %v - file stream",
        requestId);

    return response;
}

////////////////////////////////////////////////////////////////////////////////

TBlobTableReader::TBlobTableReader(
    const TYPath& path,
    const TKey& key,
    IClientRetryPolicyPtr retryPolicy,
    ITransactionPingerPtr transactionPinger,
    const TClientContext& context,
    const TTransactionId& transactionId,
    const TBlobTableReaderOptions& options)
    : TStreamReaderBase(std::move(retryPolicy), std::move(transactionPinger), context, transactionId)
    , Key_(key)
    , Options_(options)
{
    Path_ = TStreamReaderBase::Snapshot(path);
}

NHttpClient::IHttpResponsePtr TBlobTableReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes)
{
    TString hostName = GetProxyForHeavyRequest(context);

    THttpHeader header("GET", "read_blob_table");
    if (context.ServiceTicketAuth) {
        header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
    } else {
        header.SetToken(context.Token);
    }
    header.AddTransactionId(transactionId);
    header.SetOutputFormat(TMaybe<TFormat>()); // Binary format

    const ui64 currentOffset = Options_.Offset_ + readBytes;
    const i64 startPartIndex = currentOffset / Options_.PartSize_;
    const ui64 skipBytes = currentOffset - Options_.PartSize_ * startPartIndex;
    auto lowerLimitKey = Key_;
    lowerLimitKey.Parts_.push_back(startPartIndex);
    auto upperLimitKey = Key_;
    upperLimitKey.Parts_.push_back(std::numeric_limits<i64>::max());
    TNode params = PathToParamNode(TRichYPath(Path_).AddRange(TReadRange()
        .LowerLimit(TReadLimit().Key(lowerLimitKey))
        .UpperLimit(TReadLimit().Key(upperLimitKey))));
    params["start_part_index"] = TNode(startPartIndex);
    params["offset"] = skipBytes;
    if (Options_.PartIndexColumnName_) {
        params["part_index_column_name"] = *Options_.PartIndexColumnName_;
    }
    if (Options_.DataColumnName_) {
        params["data_column_name"] = *Options_.DataColumnName_;
    }
    params["part_size"] = Options_.PartSize_;
    header.MergeParameters(params);
    header.SetResponseCompression(ToString(context.Config->AcceptEncoding));

    auto requestId = CreateGuidAsString();
    NHttpClient::IHttpResponsePtr response;
    try {
        response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
    } catch (const std::exception& ex) {
        LogRequestError(requestId, header, ex.what(), "");
        throw;
    }

    YT_LOG_DEBUG("RSP %v - blob table stream",
        requestId);
    return response;
}

////////////////////////////////////////////////////////////////////////////////

} // namespace NDetail
} // namespace NYT