1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
#pragma once
#include "clickhouse_config.h"
#if USE_LIBPQXX
#include <Core/Block.h>
#include <Processors/ISource.h>
#include <Core/ExternalResultDescription.h>
#include <Core/Field.h>
#include <Core/PostgreSQL/insertPostgreSQLValue.h>
#include <Core/PostgreSQL/ConnectionHolder.h>
#include <Core/PostgreSQL/Utils.h>
namespace DB
{
template <typename T = pqxx::ReadTransaction>
class PostgreSQLSource : public ISource
{
public:
PostgreSQLSource(
postgres::ConnectionHolderPtr connection_holder_,
const String & query_str_,
const Block & sample_block,
UInt64 max_block_size_);
String getName() const override { return "PostgreSQL"; }
~PostgreSQLSource() override;
protected:
PostgreSQLSource(
std::shared_ptr<T> tx_,
const std::string & query_str_,
const Block & sample_block,
UInt64 max_block_size_,
bool auto_commit_);
String query_str;
std::shared_ptr<T> tx;
std::unique_ptr<pqxx::stream_from> stream;
Status prepare() override;
void onStart();
Chunk generate() override;
void onFinish();
private:
void init(const Block & sample_block);
const UInt64 max_block_size;
bool auto_commit = true;
ExternalResultDescription description;
bool started = false;
bool is_completed = false;
postgres::ConnectionHolderPtr connection_holder;
std::unordered_map<size_t, PostgreSQLArrayInfo> array_info;
};
/// Passes transaction object into PostgreSQLSource and does not close transaction after read is finished.
template <typename T>
class PostgreSQLTransactionSource : public PostgreSQLSource<T>
{
public:
using Base = PostgreSQLSource<T>;
PostgreSQLTransactionSource(
std::shared_ptr<T> tx_,
const std::string & query_str_,
const Block & sample_block_,
const UInt64 max_block_size_)
: PostgreSQLSource<T>(tx_, query_str_, sample_block_, max_block_size_, false) {}
};
}
#endif
|