summaryrefslogtreecommitdiffstats
path: root/contrib/python/clickhouse-connect/clickhouse_connect/cc_sqlalchemy/alembic/adapter.py
blob: d444b69e73a06dd8a9db84e2df441bc62dfa8fc7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
from alembic.autogenerate import render
from alembic.autogenerate.api import AutogenContext
from alembic.autogenerate.compare import comparators
from alembic.operations import Operations, ops
from alembic.runtime.migration import MigrationContext
from alembic.util import DispatchPriority, PriorityDispatchResult

from clickhouse_connect.cc_sqlalchemy.datatypes.base import ChSqlaType
from clickhouse_connect.cc_sqlalchemy.sql.ddlcompiler import ClickHouseDDLHelper


@Operations.register_operation("add_column")
class ClickHouseAddColumnOp(ops.AddColumnOp):
    """Re-registers op.add_column with a **kw signature."""

    @classmethod
    def add_column(cls, operations, table_name, column, *, schema=None, if_not_exists=None, **kw):
        return operations.invoke(
            ops.AddColumnOp(
                table_name,
                column,
                schema=schema,
                if_not_exists=if_not_exists,
                **kw,
            )
        )


def patch_alembic_version(context: MigrationContext):
    """
    Compatibility hook for existing migration environments.

    Version-table behavior now lives on ClickHouseImpl and no longer requires
    monkey-patching the Alembic context.
    """
    return context


def _add_common_imports(directive):
    directive.imports.add("from clickhouse_connect import cc_sqlalchemy")
    directive.imports.add("from clickhouse_connect.cc_sqlalchemy.ddl.tableengine import *  # noqa: F401,F403")
    directive.imports.add("from clickhouse_connect.cc_sqlalchemy.datatypes.sqltypes import *  # noqa: F401,F403")


def clickhouse_writer(context: MigrationContext, revision, directives):
    """
    A processing hook for autogeneration.

    Ensures that generated migration scripts include necessary imports
    and that ClickHouse-specific constructs like Engines are preserved.
    """
    for directive in directives:
        if directive.upgrade_ops and not directive.upgrade_ops.is_empty():
            _add_common_imports(directive)

        if directive.downgrade_ops and not directive.downgrade_ops.is_empty():
            _add_common_imports(directive)


def render_clickhouse_column(column, autogen_context: AutogenContext) -> str:
    rendered = render._user_defined_render("column", column, autogen_context)
    if rendered is not False:
        return rendered

    args = []
    opts = []

    if column.server_default:
        rendered_default = render._render_server_default(column.server_default, autogen_context)
        if rendered_default:
            if render._should_render_server_default_positionally(column.server_default):
                args.append(rendered_default)
            else:
                opts.append(("server_default", rendered_default))

    if column.autoincrement is not None and column.autoincrement != render.sqla_compat.AUTOINCREMENT_DEFAULT:
        opts.append(("autoincrement", column.autoincrement))

    explicit_nullable = ClickHouseDDLHelper.explicit_column_nullable(column)
    if column.nullable is not None and explicit_nullable is not None:
        opts.append(("nullable", column.nullable))

    if column.system:
        opts.append(("system", column.system))

    if column.comment:
        opts.append(("comment", repr(column.comment)))

    return "{prefix}Column({name!r}, {type}, {args}{kwargs})".format(
        prefix=render._sqlalchemy_autogenerate_prefix(autogen_context),
        name=render._ident(column.name),
        type=render._repr_type(column.type, autogen_context),
        args=", ".join(str(arg) for arg in args) + ", " if args else "",
        kwargs=", ".join(
            [f"{key}={value}" for key, value in opts]
            + [f"{key}={render._render_potential_expr(value, autogen_context)}" for key, value in column.kwargs.items()]
        ),
    )


@render.renderers.dispatch_for(ops.CreateTableOp, replace=True)
def render_create_table(autogen_context: AutogenContext, op: ops.CreateTableOp) -> str:
    table = op.to_table()

    args = [column for column in [render_clickhouse_column(column, autogen_context) for column in table.columns] if column] + sorted(
        [
            constraint
            for constraint in [render._render_constraint(cons, autogen_context, op._namespace_metadata) for cons in table.constraints]
            if constraint is not None
        ]
    )

    if len(args) > render.MAX_PYTHON_ARGS:
        args_sql = "*[" + ",\n".join(args) + "]"
    else:
        args_sql = ",\n".join(args)

    prefix = render._alembic_autogenerate_prefix(autogen_context)
    rendered = f"{prefix}create_table({render._ident(op.table_name)!r},\n{args_sql}"
    if op.schema:
        rendered += f",\nschema={render._ident(op.schema)!r}"

    if table.comment:
        rendered += f",\ncomment={render._ident(table.comment)!r}"

    if table.info:
        rendered += f",\ninfo={table.info!r}"

    for key in sorted(op.kw):
        rendered += f",\n{key.replace(' ', '_')}={op.kw[key]!r}"

    if op.if_not_exists is not None:
        rendered += f",\nif_not_exists={bool(op.if_not_exists)!r}"

    rendered += "\n)"
    return rendered


@render.renderers.dispatch_for(ops.AddColumnOp, replace=True)
def render_add_column(autogen_context: AutogenContext, op: ops.AddColumnOp) -> str:
    schema, table_name, column, if_not_exists = op.schema, op.table_name, op.column, op.if_not_exists
    prefix = render._alembic_autogenerate_prefix(autogen_context)
    rendered_column = render_clickhouse_column(column, autogen_context)
    if autogen_context._has_batch:
        return f"{prefix}add_column({rendered_column})"
    rendered = f"{prefix}add_column({table_name!r}, {rendered_column}"
    if schema:
        rendered += f", schema={schema!r}"
    if if_not_exists is not None:
        rendered += f", if_not_exists={if_not_exists!r}"
    for key in sorted(op.kw):
        rendered += f", {key}={op.kw[key]!r}"
    return rendered + ")"


@render.renderers.dispatch_for(ops.DropTableOp, replace=True)
def render_drop_table(autogen_context: AutogenContext, op: ops.DropTableOp) -> str:
    prefix = render._alembic_autogenerate_prefix(autogen_context)
    rendered = f"{prefix}drop_table({render._ident(op.table_name)!r}"
    arguments = []
    if op.schema:
        arguments.append(f"schema={render._ident(op.schema)!r}")
    if op.if_exists is not None:
        arguments.append(f"if_exists={bool(op.if_exists)!r}")
    for key in sorted(op.table_kw):
        arguments.append(f"{key.replace(' ', '_')}={op.table_kw[key]!r}")
    if arguments:
        rendered += ",\n" + ",\n".join(arguments)
    rendered += ")"
    return rendered


def include_object(object_, name, type_, reflected, compare_to):
    """
    Standard filter for ClickHouse system tables and internal objects.
    """
    # Guard against None name which can happen in some Alembic versions/contexts
    if not name:
        return True

    if type_ == "table":
        if name == "alembic_version":
            return False
        # Ignore system tables
        if object_.schema == "system":
            return False
        # Ignore internal tables (Materialized View storage)
        if name.startswith(".inner"):
            return False

    return True


@comparators.dispatch_for("column", qualifier="clickhousedb", priority=DispatchPriority.FIRST, subgroup="nullable")
def compare_nullable(context, alter_column_op, schema, table_name, column_name, inspector_column, metadata_column):
    inspector_type = inspector_column.type
    metadata_type = metadata_column.type
    if not isinstance(inspector_type, ChSqlaType) or not isinstance(metadata_type, ChSqlaType):
        return PriorityDispatchResult.CONTINUE

    inspector_nullable = inspector_type.nullable
    explicit_nullable = ClickHouseDDLHelper.explicit_column_nullable(metadata_column)
    if explicit_nullable is None and not metadata_type.nullable:
        metadata_nullable = inspector_nullable
    else:
        metadata_nullable = ClickHouseDDLHelper.column_nullable(metadata_column)
    alter_column_op.existing_nullable = inspector_nullable
    if inspector_nullable != metadata_nullable:
        alter_column_op.modify_nullable = metadata_nullable
    return PriorityDispatchResult.STOP