aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Transforms/ExceptionKeepingTransform.h
blob: cec0e0eea3144af69baaae0dd52daf229a29b363 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#pragma once
#include <Processors/IProcessor.h>

namespace DB
{


class ThreadStatus;

/// Has one input and one output.
/// Works similarly to ISimpleTransform, but with much care about exceptions.
///
/// If input contain exception, this exception is pushed directly to output port.
/// If input contain data chunk, transform() is called for it.
/// When transform throws exception itself, data chunk is replaced by caught exception.
/// Transformed chunk or newly caught exception is pushed to output.
///
/// There may be any number of exceptions read from input, transform keeps the order.
/// It is expected that output port won't be closed from the other side before all data is processed.
///
/// Method onStart() is called before reading any data.
/// Method onFinish() is called after all data from input is processed, if no exception happened.
/// In case of exception, it is additionally pushed into pipeline.
class ExceptionKeepingTransform : public IProcessor
{
protected:
    InputPort & input;
    OutputPort & output;
    Port::Data data;

    enum class Stage
    {
        Start,
        Consume,
        Generate,
        Finish,
        Exception,
    };

    Stage stage = Stage::Start;
    bool ready_input = false;
    bool ready_output = false;
    const bool ignore_on_start_and_finish = true;

    struct GenerateResult
    {
        Chunk chunk;
        bool is_done = true;
    };

    virtual void onStart() {}
    virtual void onConsume(Chunk chunk) = 0;
    virtual GenerateResult onGenerate() = 0;
    virtual void onFinish() {}
    virtual void onException(std::exception_ptr /* exception */) {}

public:
    ExceptionKeepingTransform(const Block & in_header, const Block & out_header, bool ignore_on_start_and_finish_ = true);

    Status prepare() override;
    void work() override;

    InputPort & getInputPort() { return input; }
    OutputPort & getOutputPort() { return output; }

    void setRuntimeData(ThreadStatus * thread_status_, std::atomic_uint64_t * elapsed_counter_ms_);

private:
    ThreadStatus * thread_status = nullptr;
    std::atomic_uint64_t * elapsed_counter_ms = nullptr;
};

}