aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/inspector.py
blob: d8936e67c23a582b041925079ff641adbc9d5b53 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import sqlalchemy.schema as sa_schema

from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.orm.exc import NoResultFound

from clickhouse_connect.cc_sqlalchemy.datatypes.base import sqla_type_from_name
from clickhouse_connect.cc_sqlalchemy.ddl.tableengine import build_engine
from clickhouse_connect.cc_sqlalchemy.sql import full_table
from clickhouse_connect.cc_sqlalchemy import dialect_name as dn

ch_col_args = ('default_type', 'codec_expression', 'ttl_expression')


def get_engine(connection, table_name, schema=None):
    result_set = connection.execute(
        f"SELECT engine_full FROM system.tables WHERE database = '{schema}' and name = '{table_name}'")
    row = next(result_set, None)
    if not row:
        raise NoResultFound(f'Table {schema}.{table_name} does not exist')
    return build_engine(row.engine_full)


class ChInspector(Inspector):

    def reflect_table(self, table, include_columns, exclude_columns, *_args, **_kwargs):
        schema = table.schema
        for col in self.get_columns(table.name, schema):
            name = col.pop('name')
            if (include_columns and name not in include_columns) or (exclude_columns and name in exclude_columns):
                continue
            col_type = col.pop('type')
            col_args = {f'{dn}_{key}' if key in ch_col_args else key: value for key, value in col.items() if value}
            table.append_column(sa_schema.Column(name, col_type, **col_args))
        table.engine = get_engine(self.bind, table.name, schema)

    def get_columns(self, table_name, schema=None, **_kwargs):
        table_id = full_table(table_name, schema)
        result_set = self.bind.execute(f'DESCRIBE TABLE {table_id}')
        if not result_set:
            raise NoResultFound(f'Table {full_table} does not exist')
        columns = []
        for row in result_set:
            sqla_type = sqla_type_from_name(row.type.replace('\n', ''))
            col = {'name': row.name,
                   'type': sqla_type,
                   'nullable': sqla_type.nullable,
                   'autoincrement': False,
                   'default': row.default_expression,
                   'default_type': row.default_type,
                   'comment': row.comment,
                   'codec_expression': row.codec_expression,
                   'ttl_expression': row.ttl_expression}
            columns.append(col)
        return columns


ChInspector.reflecttable = ChInspector.reflect_table  # Hack to provide backward compatibility for SQLAlchemy 1.3