1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
#include "file_writer.h"
#include <yt/cpp/mapreduce/io/helpers.h>
#include <yt/cpp/mapreduce/interface/finish_or_die.h>
#include <yt/cpp/mapreduce/common/helpers.h>
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
TFileWriter::TFileWriter(
const TRichYPath& path,
IClientRetryPolicyPtr clientRetryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
const TTransactionId& transactionId,
const TFileWriterOptions& options)
: AutoFinish_(options.AutoFinish_)
, RetryfulWriter_(
std::move(clientRetryPolicy),
std::move(transactionPinger),
context,
transactionId,
GetWriteFileCommand(context.Config->ApiVersion),
TMaybe<TFormat>(),
path,
options)
{ }
TFileWriter::~TFileWriter()
{
NDetail::FinishOrDie(this, AutoFinish_, "TFileWriter");
}
void TFileWriter::DoWrite(const void* buf, size_t len)
{
// If user tunes RetryBlockSize / DesiredChunkSize he expects
// us to send data exactly by RetryBlockSize. So behaviour of the writer is predictable.
//
// We want to avoid situation when size of sent data slightly exceeded DesiredChunkSize
// and server produced one chunk of desired size and one small chunk.
while (len > 0) {
const auto retryBlockRemainingSize = RetryfulWriter_.GetRetryBlockRemainingSize();
Y_ABORT_UNLESS(retryBlockRemainingSize > 0);
const auto firstWriteLen = Min(len, retryBlockRemainingSize);
RetryfulWriter_.Write(buf, firstWriteLen);
RetryfulWriter_.NotifyRowEnd();
len -= firstWriteLen;
buf = static_cast<const char*>(buf) + firstWriteLen;
}
}
void TFileWriter::DoFinish()
{
RetryfulWriter_.Finish();
}
size_t TFileWriter::GetBufferMemoryUsage() const
{
return RetryfulWriter_.GetBufferMemoryUsage();
}
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
|