1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
#include "RedisCommon.h"
#include <Common/Exception.h>
#include <Common/parseAddress.h>
#include <Interpreters/evaluateConstantExpression.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INTERNAL_REDIS_ERROR;
extern const int TIMEOUT_EXCEEDED;
extern const int INVALID_REDIS_STORAGE_TYPE;
}
RedisConnection::RedisConnection(RedisPoolPtr pool_, RedisClientPtr client_)
: pool(std::move(pool_)), client(std::move(client_))
{
}
RedisConnection::~RedisConnection()
{
pool->returnObject(std::move(client));
}
String storageTypeToKeyType(RedisStorageType type)
{
switch (type)
{
case RedisStorageType::SIMPLE:
return "string";
case RedisStorageType::HASH_MAP:
return "hash";
default:
return "none";
}
UNREACHABLE();
}
String serializeStorageType(RedisStorageType storage_type)
{
switch (storage_type)
{
case RedisStorageType::SIMPLE:
return "simple";
case RedisStorageType::HASH_MAP:
return "hash_map";
default:
return "none";
}
}
RedisStorageType parseStorageType(const String & storage_type_str)
{
if (storage_type_str == "hash_map")
return RedisStorageType::HASH_MAP;
else if (!storage_type_str.empty() && storage_type_str != "simple")
throw Exception(ErrorCodes::INVALID_REDIS_STORAGE_TYPE, "Unknown storage type {} for Redis dictionary", storage_type_str);
return RedisStorageType::SIMPLE;
}
RedisConnectionPtr getRedisConnection(RedisPoolPtr pool, const RedisConfiguration & configuration)
{
RedisClientPtr client;
bool ok = pool->tryBorrowObject(client,
[] { return std::make_unique<Poco::Redis::Client>(); },
REDIS_LOCK_ACQUIRE_TIMEOUT_MS);
if (!ok)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED,
"Could not get connection from pool, timeout exceeded {} seconds",
REDIS_LOCK_ACQUIRE_TIMEOUT_MS);
if (!client->isConnected())
{
try
{
client->connect(configuration.host, configuration.port);
if (!configuration.password.empty())
{
RedisCommand command("AUTH");
command << configuration.password;
String reply = client->execute<String>(command);
if (reply != "OK")
throw Exception(ErrorCodes::INTERNAL_REDIS_ERROR,
"Authentication failed with reason {}", reply);
}
if (configuration.db_index != 0)
{
RedisCommand command("SELECT");
command << std::to_string(configuration.db_index);
String reply = client->execute<String>(command);
if (reply != "OK")
throw Exception(ErrorCodes::INTERNAL_REDIS_ERROR,
"Selecting database with index {} failed with reason {}",
configuration.db_index, reply);
}
}
catch (...)
{
if (client->isConnected())
client->disconnect();
pool->returnObject(std::move(client));
throw;
}
}
return std::make_unique<RedisConnection>(pool, std::move(client));
}
RedisArrayPtr getRedisHashMapKeys(const RedisConnectionPtr & connection, RedisArray & keys)
{
RedisArrayPtr hkeys = std::make_shared<RedisArray>();
for (const auto & key : keys)
{
RedisCommand command_for_secondary_keys("HKEYS");
command_for_secondary_keys.addRedisType(key);
auto secondary_keys = connection->client->execute<RedisArray>(command_for_secondary_keys);
if (secondary_keys.isNull())
continue;
RedisArray primary_with_secondary;
primary_with_secondary.addRedisType(key);
for (const auto & secondary_key : secondary_keys)
{
primary_with_secondary.addRedisType(secondary_key);
/// Do not store more than max_block_size values for one request.
if (primary_with_secondary.size() == REDIS_MAX_BLOCK_SIZE + 1)
{
hkeys->add(primary_with_secondary);
primary_with_secondary.clear();
primary_with_secondary.addRedisType(key);
}
}
if (primary_with_secondary.size() > 1)
hkeys->add(primary_with_secondary);
}
return hkeys;
}
}
|