aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Formats/RowOutputFormatWithExceptionHandlerAdaptor.h
blob: e1426bb97439aea1394afdedb2942358fd9735cf (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#pragma once

#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h>

#include <IO/WriteBuffer.h>
#include <IO/PeekableWriteBuffer.h>

namespace DB
{

template <typename Base, typename... Args>
class RowOutputFormatWithExceptionHandlerAdaptor : public Base
{
public:
    RowOutputFormatWithExceptionHandlerAdaptor(const Block & header, WriteBuffer & out_, bool handle_exceptions, Args... args)
        : Base(header, out_, std::forward<Args>(args)...)
    {
        if (handle_exceptions)
            peekable_out = std::make_unique<PeekableWriteBuffer>(*Base::getWriteBufferPtr());
    }

    void consume(DB::Chunk chunk) override
    {
        if (!peekable_out)
        {
            Base::consume(std::move(chunk));
            return;
        }

        auto num_rows = chunk.getNumRows();
        const auto & columns = chunk.getColumns();

        for (size_t row = 0; row < num_rows; ++row)
        {
            /// It's important to set a checkpoint before writing row-between delimiter
            peekable_out->setCheckpoint();

            if (Base::haveWrittenData())
                writeRowBetweenDelimiter();

            try
            {
                write(columns, row);
            }
            catch (...)
            {
                peekable_out->rollbackToCheckpoint(/*drop=*/true);
                throw;
            }
            peekable_out->dropCheckpoint();

            Base::first_row = false;
        }
    }

    void write(const Columns & columns, size_t row_num) override { Base::write(columns, row_num); }
    void writeRowBetweenDelimiter() override { Base::writeRowBetweenDelimiter(); }

    void flush() override
    {
        if (peekable_out)
            peekable_out->next();

        Base::flush();
    }

    void finalizeBuffers() override
    {
        if (peekable_out)
            peekable_out->finalize();
        Base::finalizeBuffers();
    }

    void resetFormatterImpl() override
    {
        Base::resetFormatterImpl();
        if (peekable_out)
            peekable_out = std::make_unique<PeekableWriteBuffer>(*Base::getWriteBufferPtr());
    }

    bool supportsWritingException() const override { return true; }

    void setException(const String & exception_message_) override { exception_message = exception_message_; }

protected:
    /// Returns buffer that should be used in derived classes instead of out.
    WriteBuffer * getWriteBufferPtr() override
    {
        if (peekable_out)
            return peekable_out.get();
        return Base::getWriteBufferPtr();
    }

    String exception_message;

private:

    std::unique_ptr<PeekableWriteBuffer> peekable_out;
};

}