aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Formats/InputFormatErrorsLogger.cpp
blob: 71d51f0e04af8002fc68793080779aed4320c373 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#include <Processors/Formats/InputFormatErrorsLogger.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Common/filesystemHelpers.h>


namespace DB
{

namespace ErrorCodes
{
    extern const int DATABASE_ACCESS_DENIED;
}

namespace
{
    const String DEFAULT_OUTPUT_FORMAT = "CSV";
}

InputFormatErrorsLogger::InputFormatErrorsLogger(const ContextPtr & context)
{
    String output_format = context->getSettingsRef().errors_output_format;
    if (!FormatFactory::instance().isOutputFormat(output_format))
        output_format = DEFAULT_OUTPUT_FORMAT;
    if (context->hasInsertionTable())
        table = context->getInsertionTable().getTableName();
    if (context->getInsertionTable().hasDatabase())
        database = context->getInsertionTable().getDatabaseName();

    String path_in_setting = context->getSettingsRef().input_format_record_errors_file_path;

    if (context->getApplicationType() == Context::ApplicationType::SERVER)
    {
        auto user_files_path = context->getUserFilesPath();
        errors_file_path = fs::path(user_files_path) / path_in_setting;
        if (!fileOrSymlinkPathStartsWith(errors_file_path, user_files_path))
            throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
                            "Cannot log errors in path `{}`, because it is not inside `{}`",
                            errors_file_path, user_files_path);
    }
    else
    {
        errors_file_path = path_in_setting;
    }

    while (fs::exists(errors_file_path))
    {
        errors_file_path += "_new";
    }
    write_buf = std::make_shared<WriteBufferFromFile>(errors_file_path);

    header = Block{
        {std::make_shared<DataTypeDateTime>(), "time"},
        {std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()), "database"},
        {std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()), "table"},
        {std::make_shared<DataTypeUInt32>(), "offset"},
        {std::make_shared<DataTypeString>(), "reason"},
        {std::make_shared<DataTypeString>(), "raw_data"}};

    writer = context->getOutputFormat(output_format, *write_buf, header);
}

InputFormatErrorsLogger::~InputFormatErrorsLogger()
{
    writer->finalize();
    writer->flush();
    write_buf->finalize();
}

void InputFormatErrorsLogger::logErrorImpl(ErrorEntry entry)
{
    auto error = header.cloneEmpty();
    auto columns = error.mutateColumns();
    columns[0]->insert(entry.time);
    database.empty() ? columns[1]->insertDefault() : columns[1]->insert(database);
    table.empty() ? columns[2]->insertDefault() : columns[2]->insert(table);
    columns[3]->insert(entry.offset);
    columns[4]->insert(entry.reason);
    columns[5]->insert(entry.raw_data);
    error.setColumns(std::move(columns));

    writer->write(error);
}

void InputFormatErrorsLogger::logError(ErrorEntry entry)
{
    logErrorImpl(entry);
}

ParallelInputFormatErrorsLogger::~ParallelInputFormatErrorsLogger() = default;

void ParallelInputFormatErrorsLogger::logError(ErrorEntry entry)
{
    std::lock_guard lock(write_mutex);
    logErrorImpl(entry);
}

}