aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/StorageInput.cpp
blob: 0ad3afb2e8a437b1d902a07746853045ed67d76d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#include <Storages/StorageInput.h>
#include <Storages/IStorage.h>

#include <Interpreters/Context.h>

#include <memory>
#include <Processors/ISource.h>
#include <QueryPipeline/Pipe.h>


namespace DB
{

namespace ErrorCodes
{
    extern const int INVALID_USAGE_OF_INPUT;
}

StorageInput::StorageInput(const StorageID & table_id, const ColumnsDescription & columns_)
    : IStorage(table_id)
{
    StorageInMemoryMetadata storage_metadata;
    storage_metadata.setColumns(columns_);
    setInMemoryMetadata(storage_metadata);
}


class StorageInputSource : public ISource, WithContext
{
public:
    StorageInputSource(ContextPtr context_, Block sample_block) : ISource(std::move(sample_block)), WithContext(context_) {}

    Chunk generate() override
    {
        auto block = getContext()->getInputBlocksReaderCallback()(getContext());
        if (!block)
            return {};

        UInt64 num_rows = block.rows();
        return Chunk(block.getColumns(), num_rows);
    }

    String getName() const override { return "Input"; }
};


void StorageInput::setPipe(Pipe pipe_)
{
    pipe = std::move(pipe_);
}


Pipe StorageInput::read(
    const Names & /*column_names*/,
    const StorageSnapshotPtr & storage_snapshot,
    SelectQueryInfo & /*query_info*/,
    ContextPtr context,
    QueryProcessingStage::Enum /*processed_stage*/,
    size_t /*max_block_size*/,
    size_t /*num_streams*/)
{
    Pipes pipes;
    auto query_context = context->getQueryContext();
    /// It is TCP request if we have callbacks for input().
    if (query_context->getInputBlocksReaderCallback())
    {
        /// Send structure to the client.
        query_context->initializeInput(shared_from_this());
        return Pipe(std::make_shared<StorageInputSource>(query_context, storage_snapshot->metadata->getSampleBlock()));
    }

    if (pipe.empty())
        throw Exception(ErrorCodes::INVALID_USAGE_OF_INPUT, "Input stream is not initialized, input() must be used only in INSERT SELECT query");

    return std::move(pipe);
}

}