blob: e1426bb97439aea1394afdedb2942358fd9735cf (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
#pragma once
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h>
#include <IO/WriteBuffer.h>
#include <IO/PeekableWriteBuffer.h>
namespace DB
{
template <typename Base, typename... Args>
class RowOutputFormatWithExceptionHandlerAdaptor : public Base
{
public:
RowOutputFormatWithExceptionHandlerAdaptor(const Block & header, WriteBuffer & out_, bool handle_exceptions, Args... args)
: Base(header, out_, std::forward<Args>(args)...)
{
if (handle_exceptions)
peekable_out = std::make_unique<PeekableWriteBuffer>(*Base::getWriteBufferPtr());
}
void consume(DB::Chunk chunk) override
{
if (!peekable_out)
{
Base::consume(std::move(chunk));
return;
}
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
for (size_t row = 0; row < num_rows; ++row)
{
/// It's important to set a checkpoint before writing row-between delimiter
peekable_out->setCheckpoint();
if (Base::haveWrittenData())
writeRowBetweenDelimiter();
try
{
write(columns, row);
}
catch (...)
{
peekable_out->rollbackToCheckpoint(/*drop=*/true);
throw;
}
peekable_out->dropCheckpoint();
Base::first_row = false;
}
}
void write(const Columns & columns, size_t row_num) override { Base::write(columns, row_num); }
void writeRowBetweenDelimiter() override { Base::writeRowBetweenDelimiter(); }
void flush() override
{
if (peekable_out)
peekable_out->next();
Base::flush();
}
void finalizeBuffers() override
{
if (peekable_out)
peekable_out->finalize();
Base::finalizeBuffers();
}
void resetFormatterImpl() override
{
Base::resetFormatterImpl();
if (peekable_out)
peekable_out = std::make_unique<PeekableWriteBuffer>(*Base::getWriteBufferPtr());
}
bool supportsWritingException() const override { return true; }
void setException(const String & exception_message_) override { exception_message = exception_message_; }
protected:
/// Returns buffer that should be used in derived classes instead of out.
WriteBuffer * getWriteBufferPtr() override
{
if (peekable_out)
return peekable_out.get();
return Base::getWriteBufferPtr();
}
String exception_message;
private:
std::unique_ptr<PeekableWriteBuffer> peekable_out;
};
}
|