aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/ReadFinalForExternalReplicaStorage.cpp
blob: 28053c84e204ac1071278a302963e934b508fb81 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#include <Storages/ReadFinalForExternalReplicaStorage.h>

#if USE_MYSQL || USE_LIBPQXX

#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectQuery.h>


namespace DB
{

bool needRewriteQueryWithFinalForStorage(const Names & column_names, const StoragePtr & storage)
{
    const StorageMetadataPtr & metadata = storage->getInMemoryMetadataPtr();
    Block header = metadata->getSampleBlock();
    ColumnWithTypeAndName & version_column = header.getByPosition(header.columns() - 1);
    return std::find(column_names.begin(), column_names.end(), version_column.name) == column_names.end();
}

void readFinalFromNestedStorage(
    QueryPlan & query_plan,
    StoragePtr nested_storage,
    const Names & column_names,
    SelectQueryInfo & query_info,
    ContextPtr context,
    QueryProcessingStage::Enum processed_stage,
    size_t max_block_size,
    size_t num_streams)
{
    NameSet column_names_set = NameSet(column_names.begin(), column_names.end());
    auto lock = nested_storage->lockForShare(context->getCurrentQueryId(), context->getSettingsRef().lock_acquire_timeout);
    const auto & nested_metadata = nested_storage->getInMemoryMetadataPtr();

    Block nested_header = nested_metadata->getSampleBlock();
    ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2);

    String filter_column_name;
    Names require_columns_name = column_names;
    ASTPtr expressions = std::make_shared<ASTExpressionList>();
    if (column_names_set.empty() || !column_names_set.contains(sign_column.name))
    {
        require_columns_name.emplace_back(sign_column.name);

        const auto & sign_column_name = std::make_shared<ASTIdentifier>(sign_column.name);
        const auto & fetch_sign_value = std::make_shared<ASTLiteral>(Field(static_cast<Int8>(1)));

        expressions->children.emplace_back(makeASTFunction("equals", sign_column_name, fetch_sign_value));
        filter_column_name = expressions->children.back()->getColumnName();
    }

    auto nested_snapshot = nested_storage->getStorageSnapshot(nested_metadata, context);
    nested_storage->read(
        query_plan, require_columns_name, nested_snapshot, query_info, context, processed_stage, max_block_size, num_streams);

    if (!query_plan.isInitialized())
    {
        InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, nested_header, query_info, context);
        return;
    }

    query_plan.addTableLock(lock);
    query_plan.addStorageHolder(nested_storage);

    if (!expressions->children.empty())
    {
        const auto & header = query_plan.getCurrentDataStream().header;
        auto syntax = TreeRewriter(context).analyze(expressions, header.getNamesAndTypesList());
        auto actions = ExpressionAnalyzer(expressions, syntax, context).getActionsDAG(true /* add_aliases */, false /* project_result */);

        auto step = std::make_unique<FilterStep>(
            query_plan.getCurrentDataStream(),
            actions,
            filter_column_name,
            false);

        step->setStepDescription("Filter columns");
        query_plan.addStep(std::move(step));
    }
}
}

#endif