1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
#include <Storages/Distributed/DistributedAsyncInsertHelpers.h>
#include <Storages/Distributed/DistributedAsyncInsertHeader.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ExpressionActions.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CheckingCompressedReadBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <QueryPipeline/RemoteInserter.h>
#include <Formats/NativeReader.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_READ_ALL_DATA;
extern const int UNKNOWN_CODEC;
extern const int CANNOT_DECOMPRESS;
extern const int CHECKSUM_DOESNT_MATCH;
extern const int TOO_LARGE_SIZE_COMPRESSED;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int EMPTY_DATA_PASSED;
extern const int DISTRIBUTED_BROKEN_BATCH_INFO;
extern const int DISTRIBUTED_BROKEN_BATCH_FILES;
}
/// 'remote_error' argument is used to decide whether some errors should be
/// ignored or not, in particular:
///
/// - ATTEMPT_TO_READ_AFTER_EOF should not be ignored
/// if we receive it from remote (receiver), since:
/// - the sender will got ATTEMPT_TO_READ_AFTER_EOF when the client just go away,
/// i.e. server had been restarted
/// - since #18853 the file will be checked on the sender locally, and
/// if there is something wrong with the file itself, we will receive
/// ATTEMPT_TO_READ_AFTER_EOF not from the remote at first
/// and mark batch as broken.
bool isDistributedSendBroken(int code, bool remote_error)
{
return code == ErrorCodes::CHECKSUM_DOESNT_MATCH
|| code == ErrorCodes::EMPTY_DATA_PASSED
|| code == ErrorCodes::TOO_LARGE_SIZE_COMPRESSED
|| code == ErrorCodes::CANNOT_READ_ALL_DATA
|| code == ErrorCodes::UNKNOWN_CODEC
|| code == ErrorCodes::CANNOT_DECOMPRESS
|| code == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO
|| code == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_FILES
|| (!remote_error && code == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
}
void writeAndConvert(RemoteInserter & remote, const DistributedAsyncInsertHeader & distributed_header, ReadBufferFromFile & in)
{
CompressedReadBuffer decompressing_in(in);
NativeReader block_in(decompressing_in, distributed_header.revision);
while (Block block = block_in.read())
{
auto converting_dag = ActionsDAG::makeConvertingActions(
block.cloneEmpty().getColumnsWithTypeAndName(),
remote.getHeader().getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto converting_actions = std::make_shared<ExpressionActions>(std::move(converting_dag));
converting_actions->execute(block);
remote.write(block);
}
}
void writeRemoteConvert(
const DistributedAsyncInsertHeader & distributed_header,
RemoteInserter & remote,
bool compression_expected,
ReadBufferFromFile & in,
Poco::Logger * log)
{
if (!remote.getHeader())
{
CheckingCompressedReadBuffer checking_in(in);
remote.writePrepared(checking_in);
return;
}
/// This is old format, that does not have header for the block in the file header,
/// applying ConvertingTransform in this case is not a big overhead.
///
/// Anyway we can get header only from the first block, which contain all rows anyway.
if (!distributed_header.block_header)
{
LOG_TRACE(log, "Processing batch {} with old format (no header)", in.getFileName());
writeAndConvert(remote, distributed_header, in);
return;
}
if (!blocksHaveEqualStructure(distributed_header.block_header, remote.getHeader()))
{
LOG_WARNING(log,
"Structure does not match (remote: {}, local: {}), implicit conversion will be done",
remote.getHeader().dumpStructure(), distributed_header.block_header.dumpStructure());
writeAndConvert(remote, distributed_header, in);
return;
}
/// If connection does not use compression, we have to uncompress the data.
if (!compression_expected)
{
writeAndConvert(remote, distributed_header, in);
return;
}
if (distributed_header.revision != remote.getServerRevision())
{
writeAndConvert(remote, distributed_header, in);
return;
}
/// Otherwise write data as it was already prepared (more efficient path).
CheckingCompressedReadBuffer checking_in(in);
remote.writePrepared(checking_in);
}
}
|