1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
#include "DictionarySourceHelpers.h"
#include <Columns/ColumnsNumber.h>
#include <Core/ColumnWithTypeAndName.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/WriteHelpers.h>
#include "DictionaryStructure.h"
#include <Interpreters/Context.h>
#include <Core/Settings.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/SettingsChanges.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
}
/// For simple key
Block blockForIds(
const DictionaryStructure & dict_struct,
const std::vector<UInt64> & ids)
{
auto column = ColumnUInt64::create(ids.size());
memcpy(column->getData().data(), ids.data(), ids.size() * sizeof(ids.front()));
Block block{{std::move(column), std::make_shared<DataTypeUInt64>(), (*dict_struct.id).name}};
return block;
}
/// For composite key
Block blockForKeys(
const DictionaryStructure & dict_struct,
const Columns & key_columns,
const std::vector<size_t> & requested_rows)
{
Block block;
for (size_t i = 0, size = key_columns.size(); i < size; ++i)
{
const ColumnPtr & source_column = key_columns[i];
size_t column_rows_size = source_column->size();
PaddedPODArray<UInt8> filter(column_rows_size, false);
for (size_t idx : requested_rows)
filter[idx] = true;
auto filtered_column = source_column->filter(filter, requested_rows.size());
block.insert({filtered_column, (*dict_struct.key)[i].type, (*dict_struct.key)[i].name});
}
return block;
}
SettingsChanges readSettingsFromDictionaryConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
if (!config.has(config_prefix + ".settings"))
return {};
const auto prefix = config_prefix + ".settings";
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(prefix, config_keys);
SettingsChanges changes;
for (const std::string & key : config_keys)
{
const auto value = config.getString(prefix + "." + key);
changes.emplace_back(key, value);
}
return changes;
}
ContextMutablePtr copyContextAndApplySettingsFromDictionaryConfig(
const ContextPtr & context, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
auto context_copy = Context::createCopy(context);
auto changes = readSettingsFromDictionaryConfig(config, config_prefix);
context_copy->applySettingsChanges(changes);
return context_copy;
}
static Block transformHeader(Block header, Block block_to_add)
{
for (Int64 i = static_cast<Int64>(block_to_add.columns() - 1); i >= 0; --i)
header.insert(0, block_to_add.getByPosition(i).cloneEmpty());
return header;
}
TransformWithAdditionalColumns::TransformWithAdditionalColumns(
Block block_to_add_, const Block & header)
: ISimpleTransform(header, transformHeader(header, block_to_add_), true)
, block_to_add(std::move(block_to_add_))
{
}
void TransformWithAdditionalColumns::transform(Chunk & chunk)
{
if (chunk)
{
auto num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
auto cut_block = block_to_add.cloneWithCutColumns(current_range_index, num_rows);
if (cut_block.rows() != num_rows)
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH,
"Number of rows in block to add after cut must equal to number of rows in block from inner stream");
for (Int64 i = static_cast<Int64>(cut_block.columns() - 1); i >= 0; --i)
columns.insert(columns.begin(), cut_block.getByPosition(i).column);
current_range_index += num_rows;
chunk.setColumns(std::move(columns), num_rows);
}
}
String TransformWithAdditionalColumns::getName() const
{
return "TransformWithAdditionalColumns";
}
}
|