aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Formats/IInputFormat.h
blob: c7b8d97d1456b10e02b856253aa92780a74a5d42 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#pragma once

#include <Processors/Formats/InputFormatErrorsLogger.h>
#include <Processors/ISource.h>
#include <IO/ReadBuffer.h>
#include <Interpreters/Context.h>
#include <Formats/ColumnMapping.h>


namespace DB
{

struct SelectQueryInfo;

using ColumnMappingPtr = std::shared_ptr<ColumnMapping>;

/** Input format is a source, that reads data from ReadBuffer.
  */
class IInputFormat : public ISource
{
protected:

    ReadBuffer * in [[maybe_unused]] = nullptr;

public:
    /// ReadBuffer can be nullptr for random-access formats.
    IInputFormat(Block header, ReadBuffer * in_);

    /// If the format is used by a SELECT query, this method may be called.
    /// The format may use it for filter pushdown.
    virtual void setQueryInfo(const SelectQueryInfo &, ContextPtr) {}

    /** In some usecase (hello Kafka) we need to read a lot of tiny streams in exactly the same format.
     * The recreating of parser for each small stream takes too long, so we introduce a method
     * resetParser() which allow to reset the state of parser to continue reading of
     * source stream without recreating that.
     * That should be called after current buffer was fully read.
     */
    virtual void resetParser();

    virtual void setReadBuffer(ReadBuffer & in_);
    ReadBuffer & getReadBuffer() const { chassert(in); return *in; }

    virtual const BlockMissingValues & getMissingValues() const
    {
        static const BlockMissingValues none;
        return none;
    }

    /// Must be called from ParallelParsingInputFormat after readSuffix
    ColumnMappingPtr getColumnMapping() const { return column_mapping; }
    /// Must be called from ParallelParsingInputFormat before readPrefix
    void setColumnMapping(ColumnMappingPtr column_mapping_) { column_mapping = column_mapping_; }

    size_t getCurrentUnitNumber() const { return current_unit_number; }
    void setCurrentUnitNumber(size_t current_unit_number_) { current_unit_number = current_unit_number_; }

    void addBuffer(std::unique_ptr<ReadBuffer> buffer) { owned_buffers.emplace_back(std::move(buffer)); }

    void setErrorsLogger(const InputFormatErrorsLoggerPtr & errors_logger_) { errors_logger = errors_logger_; }

    virtual size_t getApproxBytesReadForChunk() const { return 0; }

    void needOnlyCount() { need_only_count = true; }

protected:
    virtual Chunk getChunkForCount(size_t rows);

    ColumnMappingPtr column_mapping{};

    InputFormatErrorsLoggerPtr errors_logger;

    bool need_only_count = false;

private:
    /// Number of currently parsed chunk (if parallel parsing is enabled)
    size_t current_unit_number = 0;

    std::vector<std::unique_ptr<ReadBuffer>> owned_buffers;
};

using InputFormatPtr = std::shared_ptr<IInputFormat>;

}