aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Server/ReplicasStatusHandler.cpp
blob: 8c0ab0c1a3bd126ccf6cc7c8ddc015d68c31b0fc (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#include <Server/ReplicasStatusHandler.h>

#include <Databases/IDatabase.h>
#include <IO/HTTPCommon.h>
#include <Interpreters/Context.h>
#include <Server/HTTP/HTMLForm.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/HTTPHandlerRequestFilter.h>
#include <Server/IServer.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/typeid_cast.h>

#include <Poco/Net/HTTPRequestHandlerFactory.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>


namespace DB
{

ReplicasStatusHandler::ReplicasStatusHandler(IServer & server) : WithContext(server.context())
{
}

void ReplicasStatusHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
    try
    {
        HTMLForm params(getContext()->getSettingsRef(), request);

        /// Even if lag is small, output detailed information about the lag.
        bool verbose = params.get("verbose", "") == "1";

        const MergeTreeSettings & settings = getContext()->getReplicatedMergeTreeSettings();

        bool ok = true;
        WriteBufferFromOwnString message;

        auto databases = DatabaseCatalog::instance().getDatabases();

        /// Iterate through all the replicated tables.
        for (const auto & db : databases)
        {
            /// Check if database can contain replicated tables
            if (!db.second->canContainMergeTreeTables())
                continue;

            for (auto iterator = db.second->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
            {
                const auto & table = iterator->table();
                if (!table)
                    continue;

                StorageReplicatedMergeTree * table_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get());

                if (!table_replicated)
                    continue;

                time_t absolute_delay = 0;
                time_t relative_delay = 0;

                if (!table_replicated->isTableReadOnly())
                {
                    table_replicated->getReplicaDelays(absolute_delay, relative_delay);

                    if ((settings.min_absolute_delay_to_close && absolute_delay >= static_cast<time_t>(settings.min_absolute_delay_to_close))
                        || (settings.min_relative_delay_to_close && relative_delay >= static_cast<time_t>(settings.min_relative_delay_to_close)))
                        ok = false;

                    message << backQuoteIfNeed(db.first) << "." << backQuoteIfNeed(iterator->name())
                        << ":\tAbsolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << ".\n";
                }
                else
                {
                    message << backQuoteIfNeed(db.first) << "." << backQuoteIfNeed(iterator->name())
                        << ":\tis readonly. \n";
                }
            }
        }

        const auto & config = getContext()->getConfigRef();
        setResponseDefaultHeaders(response, config.getUInt("keep_alive_timeout", 10));

        if (!ok)
        {
            response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
            verbose = true;
        }

        if (verbose)
            *response.send() << message.str();
        else
        {
            const char * data = "Ok.\n";
            response.sendBuffer(data, strlen(data));
        }
    }
    catch (...)
    {
        tryLogCurrentException("ReplicasStatusHandler");

        try
        {
            response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);

            if (!response.sent())
            {
                /// We have not sent anything yet and we don't even know if we need to compress response.
                *response.send() << getCurrentExceptionMessage(false) << std::endl;
            }
        }
        catch (...)
        {
            LOG_ERROR((&Poco::Logger::get("ReplicasStatusHandler")), "Cannot send exception to client");
        }
    }
}

HTTPRequestHandlerFactoryPtr createReplicasStatusHandlerFactory(IServer & server,
    const Poco::Util::AbstractConfiguration & config,
    const std::string & config_prefix)
{
    auto factory = std::make_shared<HandlingRuleHTTPHandlerFactory<ReplicasStatusHandler>>(server);
    factory->addFiltersFromConfig(config, config_prefix);
    return factory;
}

}