1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
#include "IBridgeHelper.h"
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/ReadHelpers.h>
#include <filesystem>
#include <thread>
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING;
}
void IBridgeHelper::startBridgeSync()
{
if (!bridgeHandShake())
{
LOG_TRACE(getLog(), "{} is not running, will try to start it", serviceAlias());
startBridge(startBridgeCommand());
bool started = false;
uint64_t milliseconds_to_wait = 10; /// Exponential backoff
uint64_t counter = 0;
while (milliseconds_to_wait < 10000)
{
++counter;
LOG_TRACE(getLog(), "Checking {} is running, try {}", serviceAlias(), counter);
if (bridgeHandShake())
{
started = true;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds_to_wait));
milliseconds_to_wait *= 2;
}
if (!started)
throw Exception(ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING, "BridgeHelper: {} is not responding", serviceAlias());
}
}
std::unique_ptr<ShellCommand> IBridgeHelper::startBridgeCommand()
{
if (startBridgeManually())
throw Exception(ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING, "{} is not running. Please, start it manually", serviceAlias());
const auto & config = getConfig();
/// Path to executable folder
fs::path path(config.getString("application.dir", "/usr/bin"));
std::vector<std::string> cmd_args;
path /= serviceFileName();
cmd_args.push_back("--http-port");
cmd_args.push_back(std::to_string(config.getUInt(configPrefix() + ".port", getDefaultPort())));
cmd_args.push_back("--listen-host");
cmd_args.push_back(config.getString(configPrefix() + ".listen_host", DEFAULT_HOST));
cmd_args.push_back("--http-timeout");
cmd_args.push_back(std::to_string(getHTTPTimeout().totalMicroseconds()));
cmd_args.push_back("--http-max-field-value-size");
cmd_args.push_back("99999999999999999"); // something "big" to accept large datasets (issue 47616)
if (config.has("logger." + configPrefix() + "_log"))
{
cmd_args.push_back("--log-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_log"));
}
if (config.has("logger." + configPrefix() + "_errlog"))
{
cmd_args.push_back("--err-log-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_errlog"));
}
if (config.has("logger." + configPrefix() + "_stdout"))
{
cmd_args.push_back("--stdout-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_stdout"));
}
if (config.has("logger." + configPrefix() + "_stderr"))
{
cmd_args.push_back("--stderr-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_stderr"));
}
if (config.has("logger." + configPrefix() + "_level"))
{
cmd_args.push_back("--log-level");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_level"));
}
LOG_TRACE(getLog(), "Starting {}", serviceAlias());
/// We will terminate it with the KILL signal instead of the TERM signal,
/// because it's more reliable for arbitrary third-party ODBC drivers.
/// The drivers can spawn threads, install their own signal handlers... we don't care.
ShellCommand::Config command_config(path.string());
command_config.arguments = cmd_args;
command_config.terminate_in_destructor_strategy = ShellCommand::DestructorStrategy(true, SIGKILL);
return ShellCommand::executeDirect(command_config);
}
}
|