aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Formats/Impl/JSONColumnsBlockInputFormatBase.h
blob: bb52e2aa516c0b1ef51c60004381f36d36584098 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#pragma once

#include <Formats/FormatSettings.h>
#include <Formats/SchemaInferenceUtils.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>


namespace DB
{

class ReadBuffer;


/// Base class for reading data in Columnar JSON formats.
class JSONColumnsReaderBase
{
public:
    JSONColumnsReaderBase(ReadBuffer & in_);

    virtual ~JSONColumnsReaderBase() = default;

    void setReadBuffer(ReadBuffer & in_) { in = &in_; }

    virtual void readChunkStart() = 0;
    virtual std::optional<String> readColumnStart() = 0;

    virtual bool checkChunkEnd() = 0;
    bool checkChunkEndOrSkipColumnDelimiter();

    bool checkColumnEnd();
    bool checkColumnEndOrSkipFieldDelimiter();

    void skipColumn();

protected:
    ReadBuffer * in;
};


/// Base class for Columnar JSON input formats. It works with data using
/// JSONColumnsReaderBase interface.
/// To implement new Columnar JSON format you need to implement new JSONColumnsReaderBase
/// interface and provide it to JSONColumnsBlockInputFormatBase.
class JSONColumnsBlockInputFormatBase : public IInputFormat
{
public:
    JSONColumnsBlockInputFormatBase(ReadBuffer & in_, const Block & header_, const FormatSettings & format_settings_, std::unique_ptr<JSONColumnsReaderBase> reader_);

    String getName() const override { return "JSONColumnsBlockInputFormatBase"; }

    void setReadBuffer(ReadBuffer & in_) override;

    const BlockMissingValues & getMissingValues() const override { return block_missing_values; }

    size_t getApproxBytesReadForChunk() const override { return approx_bytes_read_for_chunk; }

protected:
    Chunk generate() override;

    size_t readColumn(IColumn & column, const DataTypePtr & type, const SerializationPtr & serialization, const String & column_name);

    const FormatSettings format_settings;
    const NamesAndTypes fields;
    /// Maps column names and their positions in header.
    Block::NameMap name_to_index;
    Serializations serializations;
    std::unique_ptr<JSONColumnsReaderBase> reader;
    BlockMissingValues block_missing_values;
    size_t approx_bytes_read_for_chunk = 0;
};


/// Base class for schema inference from Columnar JSON input formats. It works with data using
/// JSONColumnsReaderBase interface.
/// To implement schema reader for the new Columnar JSON format you need to implement new JSONColumnsReaderBase
/// interface and provide it to JSONColumnsSchemaReaderBase.
class JSONColumnsSchemaReaderBase : public ISchemaReader
{
public:
    JSONColumnsSchemaReaderBase(ReadBuffer & in_, const FormatSettings & format_settings_, std::unique_ptr<JSONColumnsReaderBase> reader_);

    void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type);

    bool needContext() const override { return !hints_str.empty(); }
    void setContext(ContextPtr & ctx) override;

    void setMaxRowsAndBytesToRead(size_t max_rows, size_t max_bytes) override
    {
        max_rows_to_read = max_rows;
        max_bytes_to_read = max_bytes;
    }

    size_t getNumRowsRead() const override { return total_rows_read; }

private:
    NamesAndTypesList readSchema() override;

    /// Read whole column in the block (up to max_rows rows) and extract the data type.
    DataTypePtr readColumnAndGetDataType(const String & column_name, size_t & rows_read, size_t max_rows);

    const FormatSettings format_settings;
    String hints_str;
    std::unordered_map<String, DataTypePtr> hints;
    String hints_parsing_error;
    std::unique_ptr<JSONColumnsReaderBase> reader;
    Names column_names_from_settings;
    JSONInferenceInfo inference_info;

    size_t total_rows_read = 0;
    size_t max_rows_to_read;
    size_t max_bytes_to_read;
};

}