1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
#include "file_reader.h"
#include "transaction.h"
#include "transaction_pinger.h"
#include <yt/cpp/mapreduce/common/helpers.h>
#include <yt/cpp/mapreduce/common/retry_lib.h>
#include <yt/cpp/mapreduce/common/wait_proxy.h>
#include <yt/cpp/mapreduce/interface/config.h>
#include <yt/cpp/mapreduce/interface/raw_client.h>
#include <yt/cpp/mapreduce/interface/tvm.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <yt/cpp/mapreduce/io/helpers.h>
#include <yt/cpp/mapreduce/http/helpers.h>
#include <yt/cpp/mapreduce/http/http.h>
#include <yt/cpp/mapreduce/http/http_client.h>
#include <yt/cpp/mapreduce/http/retry_request.h>
namespace NYT {
namespace NDetail {
using ::ToString;
////////////////////////////////////////////////////////////////////////////////
static TMaybe<ui64> GetEndOffset(const TFileReaderOptions& options) {
if (options.Length_) {
return options.Offset_ + *options.Length_;
} else {
return Nothing();
}
}
////////////////////////////////////////////////////////////////////////////////
TStreamReaderBase::TStreamReaderBase(
const IRawClientPtr& rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
const TTransactionId& transactionId)
: RawClient_(rawClient)
, ClientRetryPolicy_(std::move(clientRetryPolicy))
, ReadTransaction_(std::make_unique<TPingableTransaction>(
RawClient_,
ClientRetryPolicy_,
context,
transactionId,
transactionPinger->GetChildTxPinger(),
TStartTransactionOptions()))
{ }
TStreamReaderBase::~TStreamReaderBase() = default;
TYPath TStreamReaderBase::Snapshot(const TYPath& path)
{
return NYT::Snapshot(RawClient_, ClientRetryPolicy_, ReadTransaction_->GetId(), path);
}
size_t TStreamReaderBase::DoRead(void* buf, size_t len)
{
if (len == 0) {
return 0;
}
return RequestWithRetry<size_t>(
ClientRetryPolicy_->CreatePolicyForReaderRequest(),
[this, &buf, len] (TMutationId /*mutationId*/) {
try {
if (!Input_) {
Input_ = Request(ReadTransaction_->GetId(), CurrentOffset_);
}
const size_t read = Input_->Read(buf, len);
CurrentOffset_ += read;
return read;
} catch (...) {
Input_ = nullptr;
throw;
}
});
}
////////////////////////////////////////////////////////////////////////////////
TFileReader::TFileReader(
const TRichYPath& path,
const IRawClientPtr& rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
const TTransactionId& transactionId,
const TFileReaderOptions& options)
: TStreamReaderBase(rawClient, std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId)
, StartOffset_(options.Offset_)
, EndOffset_(GetEndOffset(options))
, Options_(options)
, Path_(path)
{
Path_.Path_ = TStreamReaderBase::Snapshot(Path_.Path_);
}
std::unique_ptr<IInputStream> TFileReader::Request(const TTransactionId& transactionId, ui64 readBytes)
{
const ui64 currentOffset = StartOffset_ + readBytes;
if (EndOffset_) {
Y_ABORT_UNLESS(*EndOffset_ >= currentOffset);
Options_.Length(*EndOffset_ - currentOffset);
}
Options_.Offset(currentOffset);
return RawClient_->ReadFile(transactionId, Path_, Options_);
}
////////////////////////////////////////////////////////////////////////////////
TBlobTableReader::TBlobTableReader(
const TYPath& path,
const TKey& key,
const IRawClientPtr& rawClient,
IClientRetryPolicyPtr retryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
const TTransactionId& transactionId,
const TBlobTableReaderOptions& options)
: TStreamReaderBase(rawClient, std::move(retryPolicy), std::move(transactionPinger), context, transactionId)
, StartOffset_(options.Offset_)
, Key_(key)
, Options_(options)
{
Path_ = TStreamReaderBase::Snapshot(path);
}
std::unique_ptr<IInputStream> TBlobTableReader::Request(const TTransactionId& transactionId, ui64 readBytes)
{
const i64 currentOffset = StartOffset_ + readBytes;
const i64 startPartIndex = currentOffset / Options_.PartSize_;
const i64 skipBytes = currentOffset - Options_.PartSize_ * startPartIndex;
Options_.Offset(skipBytes);
Options_.StartPartIndex(startPartIndex);
return RawClient_->ReadBlobTable(transactionId, Path_, Key_, Options_);
}
////////////////////////////////////////////////////////////////////////////////
} // namespace NDetail
} // namespace NYT
|