aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Dictionaries/CassandraDictionarySource.h
blob: 2591b33c6388d3517f47065e63a30ce3284c4260 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#pragma once

#include <Dictionaries/CassandraHelpers.h>

#if USE_CASSANDRA

#include "DictionaryStructure.h"
#include "IDictionarySource.h"
#include "ExternalQueryBuilder.h"
#include <Core/Block.h>
#include <Interpreters/Context_fwd.h>
#include <Poco/Logger.h>
#include <mutex>

namespace DB
{

class CassandraDictionarySource final : public IDictionarySource
{
public:

    struct Configuration
    {
        String host;
        UInt16 port;
        String user;
        String password;
        String db;
        String table;
        String query;

        CassConsistency consistency;
        bool allow_filtering;
        /// TODO get information about key from the driver
        size_t partition_key_prefix;
        size_t max_threads;
        String where;

        Configuration(const Poco::Util::AbstractConfiguration & config, const String & config_prefix);

        void setConsistency(const String & config_str);
    };

    CassandraDictionarySource(
        const DictionaryStructure & dict_struct,
        const Configuration & configuration,
        const Block & sample_block);

    CassandraDictionarySource(
            const DictionaryStructure & dict_struct,
            const Poco::Util::AbstractConfiguration & config,
            const String & config_prefix,
            Block & sample_block);

    QueryPipeline loadAll() override;

    bool supportsSelectiveLoad() const override { return true; }

    bool isModified() const override { return true; }

    bool hasUpdateField() const override { return false; }

    DictionarySourcePtr clone() const override
    {
        return std::make_shared<CassandraDictionarySource>(dict_struct, configuration, sample_block);
    }

    QueryPipeline loadIds(const std::vector<UInt64> & ids) override;

    QueryPipeline loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;

    QueryPipeline loadUpdatedAll() override;

    String toString() const override;

private:
    void maybeAllowFiltering(String & query) const;
    CassSessionShared getSession();

    Poco::Logger * log;
    const DictionaryStructure dict_struct;
    const Configuration configuration;
    Block sample_block;
    ExternalQueryBuilder query_builder;

    CassClusterPtr cluster;

    std::mutex connect_mutex;
    CassSessionWeak maybe_session TSA_GUARDED_BY(connect_mutex);
};
}

#endif