aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Interpreters/FullSortingMergeJoin.h
blob: 3fc9f8920ed7e927077f7c05816c5031b6787df2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#pragma once

#include <Interpreters/IJoin.h>
#include <Interpreters/TableJoin.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Common/logger_useful.h>
#include <Poco/Logger.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
    extern const int NOT_IMPLEMENTED;
    extern const int TYPE_MISMATCH;
}

/// Dummy class, actual joining is done by MergeTransform
class FullSortingMergeJoin : public IJoin
{
public:
    explicit FullSortingMergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_)
        : table_join(table_join_)
        , right_sample_block(right_sample_block_)
    {
        LOG_TRACE(&Poco::Logger::get("FullSortingMergeJoin"), "Will use full sorting merge join");
    }

    std::string getName() const override { return "FullSortingMergeJoin"; }
    const TableJoin & getTableJoin() const override { return *table_join; }

    bool addBlockToJoin(const Block & /* block */, bool /* check_limits */) override
    {
        throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin::addBlockToJoin should not be called");
    }

    static bool isSupported(const std::shared_ptr<TableJoin> & table_join)
    {
        if (!table_join->oneDisjunct())
            return false;

        bool support_storage = !table_join->isSpecialStorage();

        const auto & on_expr = table_join->getOnlyClause();
        bool support_conditions = !on_expr.on_filter_condition_left && !on_expr.on_filter_condition_right;

        if (!on_expr.analyzer_left_filter_condition_column_name.empty() ||
            !on_expr.analyzer_right_filter_condition_column_name.empty())
            support_conditions = false;

        /// Key column can change nullability and it's not handled on type conversion stage, so algorithm should be aware of it
        bool support_using_and_nulls = !table_join->hasUsing() || !table_join->joinUseNulls();

        return support_conditions && support_using_and_nulls && support_storage;
    }

    void checkTypesOfKeys(const Block & left_block) const override
    {
        if (!isSupported(table_join))
            throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "FullSortingMergeJoin doesn't support specified query");

        const auto & onexpr = table_join->getOnlyClause();
        for (size_t i = 0; i < onexpr.key_names_left.size(); ++i)
        {
            DataTypePtr left_type = left_block.getByName(onexpr.key_names_left[i]).type;
            DataTypePtr right_type = right_sample_block.getByName(onexpr.key_names_right[i]).type;

            bool type_equals
                = table_join->hasUsing() ? left_type->equals(*right_type) : removeNullable(left_type)->equals(*removeNullable(right_type));

            /// Even slightly different types should be converted on previous pipeline steps.
            /// If we still have some differences, we can't join, because the algorithm expects strict type equality.
            if (!type_equals)
            {
                throw DB::Exception(
                    ErrorCodes::TYPE_MISMATCH,
                    "Type mismatch of columns to JOIN by: {} :: {} at left, {} :: {} at right",
                    onexpr.key_names_left[i], left_type->getName(),
                    onexpr.key_names_right[i], right_type->getName());
            }
        }
    }

    /// Used just to get result header
    void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & /* not_processed */) override
    {
        for (const auto & col : right_sample_block)
            block.insert(col);
        block = materializeBlock(block).cloneEmpty();
    }

    void setTotals(const Block & block) override { totals = block; }
    const Block & getTotals() const override { return totals; }

    size_t getTotalRowCount() const override
    {
        throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin::getTotalRowCount should not be called");
    }

    size_t getTotalByteCount() const override
    {
        throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin::getTotalByteCount should not be called");
    }

    bool alwaysReturnsEmptySet() const override { return false; }

    IBlocksStreamPtr
    getNonJoinedBlocks(const Block & /* left_sample_block */, const Block & /* result_sample_block */, UInt64 /* max_block_size */) const override
    {
        throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin::getNonJoinedBlocks should not be called");
    }

    /// Left and right streams have the same priority and are processed simultaneously
    JoinPipelineType pipelineType() const override { return JoinPipelineType::YShaped; }

private:
    std::shared_ptr<TableJoin> table_join;
    Block right_sample_block;
    Block totals;
};

}