aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Interpreters/TableJoin.h
blob: 7736fbfcf5c4f7217a5b2279242eabf648c89f6a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
#pragma once

#include <Core/Names.h>
#include <Core/NamesAndTypes.h>
#include <Core/SettingsEnums.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/JoinUtils.h>
#include <QueryPipeline/SizeLimits.h>
#include <DataTypes/getLeastSupertype.h>
#include <Interpreters/IKeyValueEntity.h>

#include <Common/Exception.h>
#include <Parsers/IAST_fwd.h>

#include <cstddef>
#include <unordered_map>

#include <utility>
#include <memory>
#include <base/types.h>

namespace DB
{

class Context;
class ASTSelectQuery;
struct DatabaseAndTableWithAlias;
class Block;
class DictionaryJoinAdapter;
class StorageJoin;
class StorageDictionary;
class IKeyValueEntity;

struct ColumnWithTypeAndName;
using ColumnsWithTypeAndName = std::vector<ColumnWithTypeAndName>;

struct Settings;

class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;

class TableJoin
{
public:
    using NameToTypeMap = std::unordered_map<String, DataTypePtr>;

    /// Corresponds to one disjunct
    struct JoinOnClause
    {
        Names key_names_left;
        Names key_names_right; /// Duplicating right key names are qualified

        /** JOIN ON a1 == a2 AND b1 <=> b2 AND c1 == c2 AND d1 <=> d2
          * key_names_left:  [a1, b1, c1, d1]
          * key_names_right: [a2, b2, c2, d2]
          * nullsafe_compare_key_indexes: {1, 3}
          */
        std::unordered_set<size_t> nullsafe_compare_key_indexes;

        ASTPtr on_filter_condition_left;
        ASTPtr on_filter_condition_right;

        std::string analyzer_left_filter_condition_column_name;
        std::string analyzer_right_filter_condition_column_name;

        JoinOnClause() = default;

        void addKey(const String & left_name, const String & right_name, bool null_safe_comparison)
        {
            key_names_left.push_back(left_name);
            key_names_right.push_back(right_name);
            if (null_safe_comparison)
                nullsafe_compare_key_indexes.insert(key_names_left.size() - 1);
        }

        std::pair<String, String> condColumnNames() const
        {
            std::pair<String, String> res;

            if (!analyzer_left_filter_condition_column_name.empty())
                res.first = analyzer_left_filter_condition_column_name;

            if (!analyzer_right_filter_condition_column_name.empty())
                res.second = analyzer_right_filter_condition_column_name;

            if (on_filter_condition_left)
                res.first = on_filter_condition_left->getColumnName();
            if (on_filter_condition_right)
                res.second = on_filter_condition_right->getColumnName();

            return res;
        }

        size_t keysCount() const
        {
            assert(key_names_left.size() == key_names_right.size());
            return key_names_right.size();
        }

        String formatDebug(bool short_format = false) const
        {
            const auto & [left_cond, right_cond] = condColumnNames();

            if (short_format)
            {
                return fmt::format("({}) = ({}){}{}", fmt::join(key_names_left, ", "), fmt::join(key_names_right, ", "),
                                   !left_cond.empty() ? " AND " + left_cond : "", !right_cond.empty() ? " AND " + right_cond : "");
            }

            return fmt::format(
                "Left keys: [{}] Right keys [{}] Condition columns: '{}', '{}'",
                 fmt::join(key_names_left, ", "), fmt::join(key_names_right, ", "), left_cond, right_cond);
        }
    };

    using Clauses = std::vector<JoinOnClause>;

    static std::string formatClauses(const Clauses & clauses, bool short_format = false)
    {
        std::vector<std::string> res;
        for (const auto & clause : clauses)
            res.push_back("[" + clause.formatDebug(short_format) + "]");
        return fmt::format("{}", fmt::join(res, "; "));
    }

private:
    /** Query of the form `SELECT expr(x) AS k FROM t1 ANY LEFT JOIN (SELECT expr(x) AS k FROM t2) USING k`
      * The join is made by column k.
      * During the JOIN,
      *  - in the "right" table, it will be available by alias `k`, since `Project` action for the subquery was executed.
      *  - in the "left" table, it will be accessible by the name `expr(x)`, since `Project` action has not been executed yet.
      * You must remember both of these options.
      *
      * Query of the form `SELECT ... from t1 ANY LEFT JOIN (SELECT ... from t2) ON expr(t1 columns) = expr(t2 columns)`
      *     to the subquery will be added expression `expr(t2 columns)`.
      * It's possible to use name `expr(t2 columns)`.
      */
    SizeLimits size_limits;
    const size_t default_max_bytes = 0;
    const bool join_use_nulls = false;
    const size_t max_joined_block_rows = 0;
    MultiEnum<JoinAlgorithm> join_algorithm = MultiEnum<JoinAlgorithm>(JoinAlgorithm::AUTO);
    const size_t partial_merge_join_rows_in_right_blocks = 0;
    const size_t partial_merge_join_left_table_buffer_bytes = 0;
    const size_t max_files_to_merge = 0;
    const String temporary_files_codec = "LZ4";

    ASTs key_asts_left;
    ASTs key_asts_right;

    Clauses clauses;

    ASTTableJoin table_join;

    ASOFJoinInequality asof_inequality = ASOFJoinInequality::GreaterOrEquals;

    /// All columns which can be read from joined table. Duplicating names are qualified.
    NamesAndTypesList columns_from_joined_table;
    /// Columns will be added to block by JOIN.
    /// It's a subset of columns_from_joined_table
    /// Note: without corrected Nullability or type, see correctedColumnsAddedByJoin
    NamesAndTypesList columns_added_by_join;

    /// Target type to convert key columns before join
    NameToTypeMap left_type_map;
    NameToTypeMap right_type_map;

    /// Name -> original name. Names are the same as in columns_from_joined_table list.
    std::unordered_map<String, String> original_names;
    /// Original name -> name. Only renamed columns.
    std::unordered_map<String, String> renames;

    /// Map column name to actual key name that can be an alias.
    /// Example: SELECT r.id as rid from t JOIN r ON t.id = rid
    /// Map: r.id -> rid
    /// Required only for StorageJoin to map join keys back to original column names.
    /// (workaround for ExpressionAnalyzer)
    std::unordered_map<String, String> right_key_aliases;

    VolumePtr tmp_volume;

    std::shared_ptr<StorageJoin> right_storage_join;

    std::shared_ptr<const IKeyValueEntity> right_kv_storage;

    std::string right_storage_name;

    bool is_join_with_constant = false;

    Names requiredJoinedNames() const;

    /// Create converting actions and change key column names if required
    ActionsDAGPtr applyKeyConvertToTable(
        const ColumnsWithTypeAndName & cols_src,
        const NameToTypeMap & type_mapping,
        JoinTableSide table_side,
        NameToNameMap & key_column_rename);

    ActionsDAGPtr applyNullsafeWrapper(
        const ColumnsWithTypeAndName & cols_src,
        const NameSet & columns_for_nullsafe_comparison,
        JoinTableSide table_side,
        NameToNameMap & key_column_rename);

    ActionsDAGPtr applyJoinUseNullsConversion(
        const ColumnsWithTypeAndName & cols_src,
        const NameToNameMap & key_column_rename);

    void applyRename(JoinTableSide side, const NameToNameMap & name_map);

    void addKey(const String & left_name, const String & right_name, const ASTPtr & left_ast, const ASTPtr & right_ast, bool null_safe_comparison = false);

    void assertHasOneOnExpr() const;

    /// Calculates common supertypes for corresponding join key columns.
    template <typename LeftNamesAndTypes, typename RightNamesAndTypes>
    void inferJoinKeyCommonType(const LeftNamesAndTypes & left, const RightNamesAndTypes & right, bool allow_right, bool strict);

    void deduplicateAndQualifyColumnNames(const NameSet & left_table_columns, const String & right_table_prefix);

    std::pair<NameSet, NameSet>
    getKeysForNullSafeComparion(const ColumnsWithTypeAndName & left_sample_columns, const ColumnsWithTypeAndName & right_sample_columns);

public:
    TableJoin() = default;

    TableJoin(const Settings & settings, VolumePtr tmp_volume_);

    /// for StorageJoin
    TableJoin(SizeLimits limits, bool use_nulls, JoinKind kind, JoinStrictness strictness,
              const Names & key_names_right)
        : size_limits(limits)
        , default_max_bytes(0)
        , join_use_nulls(use_nulls)
        , join_algorithm(JoinAlgorithm::DEFAULT)
    {
        clauses.emplace_back().key_names_right = key_names_right;
        table_join.kind = kind;
        table_join.strictness = strictness;
    }

    JoinKind kind() const { return table_join.kind; }
    JoinStrictness strictness() const { return table_join.strictness; }
    bool sameStrictnessAndKind(JoinStrictness, JoinKind) const;
    const SizeLimits & sizeLimits() const { return size_limits; }
    VolumePtr getGlobalTemporaryVolume() { return tmp_volume; }

    ActionsDAGPtr createJoinedBlockActions(ContextPtr context) const;

    bool isEnabledAlgorithm(JoinAlgorithm val) const
    {
        /// When join_algorithm = 'default' (not specified by user) we use hash or direct algorithm.
        /// It's behaviour that was initially supported by clickhouse.
        bool is_enabled_by_default = val == JoinAlgorithm::DEFAULT
                                  || val == JoinAlgorithm::HASH
                                  || val == JoinAlgorithm::DIRECT;
        if (join_algorithm.isSet(JoinAlgorithm::DEFAULT) && is_enabled_by_default)
            return true;
        return join_algorithm.isSet(val);
    }

    bool allowParallelHashJoin() const;

    bool joinUseNulls() const { return join_use_nulls; }

    bool forceNullableRight() const
    {
        return join_use_nulls && isLeftOrFull(kind());
    }

    bool forceNullableLeft() const
    {
        return join_use_nulls && isRightOrFull(kind());
    }

    size_t defaultMaxBytes() const { return default_max_bytes; }
    size_t maxJoinedBlockRows() const { return max_joined_block_rows; }
    size_t maxRowsInRightBlock() const { return partial_merge_join_rows_in_right_blocks; }
    size_t maxBytesInLeftBuffer() const { return partial_merge_join_left_table_buffer_bytes; }
    size_t maxFilesToMerge() const { return max_files_to_merge; }
    const String & temporaryFilesCodec() const { return temporary_files_codec; }
    bool needStreamWithNonJoinedRows() const;

    bool oneDisjunct() const;

    ASTTableJoin & getTableJoin() { return table_join; }
    const ASTTableJoin & getTableJoin() const { return table_join; }

    JoinOnClause & getOnlyClause() { assertHasOneOnExpr(); return clauses[0]; }
    const JoinOnClause & getOnlyClause() const { assertHasOneOnExpr(); return clauses[0]; }

    std::vector<JoinOnClause> & getClauses() { return clauses; }
    const std::vector<JoinOnClause> & getClauses() const { return clauses; }

    Names getAllNames(JoinTableSide side) const;

    void resetCollected();
    void addUsingKey(const ASTPtr & ast);

    void addDisjunct();

    void addOnKeys(ASTPtr & left_table_ast, ASTPtr & right_table_ast, bool null_safe_comparison);

    /* Conditions for left/right table from JOIN ON section.
     *
     * Conditions for left and right tables stored separately and united with 'and' function into one column.
     * For example for query:
     * SELECT ... JOIN ... ON t1.id == t2.id AND expr11(t1) AND expr21(t2) AND expr12(t1) AND expr22(t2)
     *
     * We will build two new ASTs: `expr11(t1) AND expr12(t1)`, `expr21(t2) AND expr22(t2)`
     * Such columns will be added and calculated for left and right tables respectively.
     * Only rows where conditions are met (where new columns have non-zero value) will be joined.
     *
     * NOTE: non-equi condition containing columns from different tables (like `... ON t1.id = t2.id AND t1.val > t2.val)
     *     doesn't supported yet, it can be added later.
     */
    void addJoinCondition(const ASTPtr & ast, bool is_left);

    bool hasUsing() const { return table_join.using_expression_list != nullptr; }
    bool hasOn() const { return table_join.on_expression != nullptr; }

    String getOriginalName(const String & column_name) const;
    NamesWithAliases getNamesWithAliases(const NameSet & required_columns) const;
    NamesWithAliases getRequiredColumns(const Block & sample, const Names & action_required_columns) const;

    size_t rightKeyInclusion(const String & name) const;
    NameSet requiredRightKeys() const;

    bool isJoinWithConstant() const
    {
        return is_join_with_constant;
    }

    void setIsJoinWithConstant(bool is_join_with_constant_value)
    {
        is_join_with_constant = is_join_with_constant_value;
    }

    bool leftBecomeNullable(const DataTypePtr & column_type) const;
    bool rightBecomeNullable(const DataTypePtr & column_type) const;
    void addJoinedColumn(const NameAndTypePair & joined_column);
    void setColumnsAddedByJoin(const NamesAndTypesList & columns_added_by_join_value)
    {
        columns_added_by_join = columns_added_by_join_value;
    }

    template <typename TColumns>
    void addJoinedColumnsAndCorrectTypesImpl(TColumns & left_columns, bool correct_nullability);

    void addJoinedColumnsAndCorrectTypes(NamesAndTypesList & left_columns, bool correct_nullability);
    void addJoinedColumnsAndCorrectTypes(ColumnsWithTypeAndName & left_columns, bool correct_nullability);

    /// Calculate converting actions, rename key columns in required
    /// For `USING` join we will convert key columns inplace and affect into types in the result table
    /// For `JOIN ON` we will create new columns with converted keys to join by.
    std::pair<ActionsDAGPtr, ActionsDAGPtr>
    createConvertingActions(
        const ColumnsWithTypeAndName & left_sample_columns,
        const ColumnsWithTypeAndName & right_sample_columns);

    void setAsofInequality(ASOFJoinInequality inequality) { asof_inequality = inequality; }
    ASOFJoinInequality getAsofInequality() const { return asof_inequality; }

    ASTPtr leftKeysList() const;
    ASTPtr rightKeysList() const; /// For ON syntax only

    void setColumnsFromJoinedTable(NamesAndTypesList columns_from_joined_table_value, const NameSet & left_table_columns, const String & right_table_prefix)
    {
        columns_from_joined_table = std::move(columns_from_joined_table_value);
        deduplicateAndQualifyColumnNames(left_table_columns, right_table_prefix);
    }
    const NamesAndTypesList & columnsFromJoinedTable() const { return columns_from_joined_table; }
    const NamesAndTypesList & columnsAddedByJoin() const { return columns_added_by_join; }

    /// StorageJoin overrides key names (cause of different names qualification)
    void setRightKeys(const Names & keys) { getOnlyClause().key_names_right = keys; }
    void setLeftKeys(const Names & keys) { getOnlyClause().key_names_left = keys; }

    Block getRequiredRightKeys(const Block & right_table_keys, std::vector<String> & keys_sources) const;

    String renamedRightColumnName(const String & name) const;
    String renamedRightColumnNameWithAlias(const String & name) const;
    void setRename(const String & from, const String & to);

    void resetKeys();
    void resetToCross();

    std::unordered_map<String, String> leftToRightKeyRemap() const;

    /// Remember storage name in case of joining with dictionary or another special storage
    void setRightStorageName(const std::string & storage_name);
    const std::string & getRightStorageName() const;

    void setStorageJoin(std::shared_ptr<const IKeyValueEntity> storage);
    void setStorageJoin(std::shared_ptr<StorageJoin> storage);

    std::shared_ptr<StorageJoin> getStorageJoin() const { return right_storage_join; }

    bool isSpecialStorage() const { return !right_storage_name.empty() || right_storage_join || right_kv_storage; }

    std::shared_ptr<const IKeyValueEntity> getStorageKeyValue() { return right_kv_storage; }

    NamesAndTypesList correctedColumnsAddedByJoin() const;
};

}