1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
#pragma once
#include <Interpreters/Context.h>
#include <Interpreters/IKeyValueEntity.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/IStorage.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Common/PODArray_fwd.h>
#include <Common/logger_useful.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <span>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_STATE;
}
// KV store using (Zoo|CH)Keeper
class StorageKeeperMap final : public IStorage, public IKeyValueEntity, WithContext
{
public:
StorageKeeperMap(
ContextPtr context_,
const StorageID & table_id,
const StorageInMemoryMetadata & metadata,
bool attach,
std::string_view primary_key_,
const std::string & root_path_,
UInt64 keys_limit_);
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, bool async_insert) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override;
void drop() override;
NamesAndTypesList getVirtuals() const override;
std::string getName() const override { return "KeeperMap"; }
Names getPrimaryKey() const override { return {primary_key}; }
Chunk getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & null_map, const Names &) const override;
Chunk getBySerializedKeys(std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map, bool with_version) const;
Block getSampleBlock(const Names &) const override;
void checkTableCanBeRenamed(const StorageID & new_name) const override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
void checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const override;
void mutate(const MutationCommands & commands, ContextPtr context) override;
bool supportsParallelInsert() const override { return true; }
bool supportsIndexForIn() const override { return true; }
bool mayBenefitFromIndexForIn(
const ASTPtr & node, ContextPtr /*query_context*/, const StorageMetadataPtr & /*metadata_snapshot*/) const override
{
return node->getColumnName() == primary_key;
}
bool supportsDelete() const override { return true; }
zkutil::ZooKeeperPtr getClient() const;
const std::string & dataPath() const;
std::string fullPathForKey(std::string_view key) const;
UInt64 keysLimit() const;
template <bool throw_on_error>
void checkTable() const
{
auto is_table_valid = isTableValid();
if (!is_table_valid.has_value())
{
static constexpr auto error_msg = "Failed to activate table because of connection issues. It will be activated "
"once a connection is established and metadata is verified";
if constexpr (throw_on_error)
throw Exception(ErrorCodes::INVALID_STATE, error_msg);
else
{
LOG_ERROR(log, error_msg);
return;
}
}
if (!*is_table_valid)
{
static constexpr auto error_msg
= "Failed to activate table because of invalid metadata in ZooKeeper. Please DETACH table";
if constexpr (throw_on_error)
throw Exception(ErrorCodes::INVALID_STATE, error_msg);
else
{
LOG_ERROR(log, error_msg);
return;
}
}
}
private:
bool dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock);
std::optional<bool> isTableValid() const;
std::string root_path;
std::string primary_key;
std::string data_path;
std::string metadata_path;
std::string tables_path;
std::string table_path;
std::string dropped_path;
std::string dropped_lock_path;
std::string zookeeper_name;
std::string metadata_string;
uint64_t keys_limit{0};
mutable std::mutex zookeeper_mutex;
mutable zkutil::ZooKeeperPtr zookeeper_client{nullptr};
mutable std::mutex init_mutex;
mutable std::optional<bool> table_is_valid;
Poco::Logger * log;
};
}
|