aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-12-09 10:11:16 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-12-09 10:33:35 +0300
commitfa9347ea5cf4447897b525032be9a711cc3dc583 (patch)
tree4f3d4f493e4cfb43a3c8b5f7e279621c41e0e978
parentf82bfd2a08a51c4815a4cde64974f819ed4f7128 (diff)
downloadydb-fa9347ea5cf4447897b525032be9a711cc3dc583.tar.gz
Intermediate changes
-rw-r--r--contrib/clickhouse/src/Common/ya.make1
-rw-r--r--contrib/clickhouse/src/Core/Settings.h1
-rw-r--r--contrib/clickhouse/src/Formats/FormatFactory.cpp7
-rw-r--r--contrib/clickhouse/src/Formats/FormatSettings.h6
-rw-r--r--contrib/clickhouse/src/Formats/JSONUtils.cpp6
-rw-r--r--contrib/clickhouse/src/Formats/JSONUtils.h2
-rw-r--r--contrib/clickhouse/src/IO/PeekableWriteBuffer.cpp86
-rw-r--r--contrib/clickhouse/src/IO/PeekableWriteBuffer.h59
-rw-r--r--contrib/clickhouse/src/Interpreters/executeQuery.cpp18
-rw-r--r--contrib/clickhouse/src/Interpreters/executeQuery.h5
-rw-r--r--contrib/clickhouse/src/Processors/Formats/IOutputFormat.h8
-rw-r--r--contrib/clickhouse/src/Processors/Formats/Impl/JSONColumnsBlockOutputFormatBase.cpp3
-rw-r--r--contrib/clickhouse/src/Processors/Formats/Impl/JSONColumnsBlockOutputFormatBase.h1
-rw-r--r--contrib/clickhouse/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp22
-rw-r--r--contrib/clickhouse/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h10
-rw-r--r--contrib/clickhouse/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp22
-rw-r--r--contrib/clickhouse/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h11
-rw-r--r--contrib/clickhouse/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.cpp12
-rw-r--r--contrib/clickhouse/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.h1
-rw-r--r--contrib/clickhouse/src/Processors/Formats/Impl/JSONRowOutputFormat.cpp14
-rw-r--r--contrib/clickhouse/src/Processors/Formats/Impl/JSONRowOutputFormat.h5
-rw-r--r--contrib/clickhouse/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp52
-rw-r--r--contrib/clickhouse/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h31
-rw-r--r--contrib/clickhouse/src/Processors/Formats/Impl/XMLRowOutputFormat.cpp16
-rw-r--r--contrib/clickhouse/src/Processors/Formats/Impl/XMLRowOutputFormat.h5
-rw-r--r--contrib/clickhouse/src/Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h36
-rw-r--r--contrib/clickhouse/src/Processors/Formats/RowOutputFormatWithExceptionHandlerAdaptor.h104
-rw-r--r--contrib/clickhouse/src/Server/HTTPHandler.cpp74
-rw-r--r--contrib/clickhouse/src/Server/HTTPHandler.h2
29 files changed, 542 insertions, 78 deletions
diff --git a/contrib/clickhouse/src/Common/ya.make b/contrib/clickhouse/src/Common/ya.make
index e81b00e118..a5813e2209 100644
--- a/contrib/clickhouse/src/Common/ya.make
+++ b/contrib/clickhouse/src/Common/ya.make
@@ -317,6 +317,7 @@ SRCS(
src/IO/OpenedFile.cpp
src/IO/ParallelReadBuffer.cpp
src/IO/PeekableReadBuffer.cpp
+ src/IO/PeekableWriteBuffer.cpp
src/IO/Progress.cpp
src/IO/ReadBuffer.cpp
src/IO/ReadBufferFromEncryptedFile.cpp
diff --git a/contrib/clickhouse/src/Core/Settings.h b/contrib/clickhouse/src/Core/Settings.h
index e402e146b6..b3fe90b864 100644
--- a/contrib/clickhouse/src/Core/Settings.h
+++ b/contrib/clickhouse/src/Core/Settings.h
@@ -279,6 +279,7 @@ class IColumn;
\
M(UInt64, http_headers_progress_interval_ms, 100, "Do not send HTTP headers X-ClickHouse-Progress more frequently than at each specified interval.", 0) \
M(Bool, http_wait_end_of_query, false, "Enable HTTP response buffering on the server-side.", 0) \
+ M(Bool, http_write_exception_in_output_format, false, "Write exception in output format to produce valid output. Works with JSON and XML formats.", 0) \
M(UInt64, http_response_buffer_size, 0, "The number of bytes to buffer in the server memory before sending a HTTP response to the client or flushing to disk (when http_wait_end_of_query is enabled).", 0) \
\
M(Bool, fsync_metadata, true, "Do fsync after changing metadata for tables and databases (.sql files). Could be disabled in case of poor latency on server with high load of DDL queries and high load of disk subsystem.", 0) \
diff --git a/contrib/clickhouse/src/Formats/FormatFactory.cpp b/contrib/clickhouse/src/Formats/FormatFactory.cpp
index c349ebd94e..d7348527b9 100644
--- a/contrib/clickhouse/src/Formats/FormatFactory.cpp
+++ b/contrib/clickhouse/src/Formats/FormatFactory.cpp
@@ -1,7 +1,6 @@
#include <Formats/FormatFactory.h>
#include <algorithm>
-#include <Core/Settings.h>
#include <Formats/FormatSettings.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
@@ -229,6 +228,12 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
context->getRemoteHostFilter().checkURL(avro_schema_registry_url);
}
+ if (context->getClientInfo().interface == ClientInfo::Interface::HTTP && context->getSettingsRef().http_write_exception_in_output_format.value)
+ {
+ format_settings.json.valid_output_on_exception = true;
+ format_settings.xml.valid_output_on_exception = true;
+ }
+
return format_settings;
}
diff --git a/contrib/clickhouse/src/Formats/FormatSettings.h b/contrib/clickhouse/src/Formats/FormatSettings.h
index 0c760f9151..1de2f1da5e 100644
--- a/contrib/clickhouse/src/Formats/FormatSettings.h
+++ b/contrib/clickhouse/src/Formats/FormatSettings.h
@@ -198,6 +198,7 @@ struct FormatSettings
bool validate_types_from_metadata = true;
bool validate_utf8 = false;
bool allow_object_type = false;
+ bool valid_output_on_exception = false;
bool compact_allow_variable_number_of_columns = false;
} json;
@@ -405,6 +406,11 @@ struct FormatSettings
{
bool allow_types_conversion = true;
} native;
+
+ struct
+ {
+ bool valid_output_on_exception = false;
+ } xml;
};
}
diff --git a/contrib/clickhouse/src/Formats/JSONUtils.cpp b/contrib/clickhouse/src/Formats/JSONUtils.cpp
index 6fbda86915..240f1dba33 100644
--- a/contrib/clickhouse/src/Formats/JSONUtils.cpp
+++ b/contrib/clickhouse/src/Formats/JSONUtils.cpp
@@ -531,6 +531,12 @@ namespace JSONUtils
}
}
+ void writeException(const String & exception_message, WriteBuffer & out, const FormatSettings & settings, size_t indent)
+ {
+ writeTitle("exception", out, indent, " ");
+ writeJSONString(exception_message, out, settings);
+ }
+
Strings makeNamesValidJSONStrings(const Strings & names, const FormatSettings & settings, bool validate_utf8)
{
Strings result;
diff --git a/contrib/clickhouse/src/Formats/JSONUtils.h b/contrib/clickhouse/src/Formats/JSONUtils.h
index bd56eb646c..b2bd29bcd0 100644
--- a/contrib/clickhouse/src/Formats/JSONUtils.h
+++ b/contrib/clickhouse/src/Formats/JSONUtils.h
@@ -108,6 +108,8 @@ namespace JSONUtils
bool write_statistics,
WriteBuffer & out);
+ void writeException(const String & exception_message, WriteBuffer & out, const FormatSettings & settings, size_t indent = 0);
+
void skipColon(ReadBuffer & in);
void skipComma(ReadBuffer & in);
diff --git a/contrib/clickhouse/src/IO/PeekableWriteBuffer.cpp b/contrib/clickhouse/src/IO/PeekableWriteBuffer.cpp
new file mode 100644
index 0000000000..e72347146f
--- /dev/null
+++ b/contrib/clickhouse/src/IO/PeekableWriteBuffer.cpp
@@ -0,0 +1,86 @@
+#include <IO/PeekableWriteBuffer.h>
+
+namespace DB
+{
+
+PeekableWriteBuffer::PeekableWriteBuffer(DB::WriteBuffer & sub_buf_) : BufferWithOwnMemory(0), sub_buf(sub_buf_)
+{
+ Buffer & sub_working = sub_buf.buffer();
+ BufferBase::set(sub_working.begin() + sub_buf.offset(), sub_working.size() - sub_buf.offset(), 0);
+}
+
+void PeekableWriteBuffer::nextImpl()
+{
+ if (checkpoint)
+ {
+ if (write_to_own_memory)
+ {
+ size_t prev_size = position() - memory.data();
+ size_t new_size = memory.size() * 2;
+ memory.resize(new_size);
+ BufferBase::set(memory.data() + prev_size, memory.size() - prev_size, 0);
+ return;
+ }
+
+ if (memory.size() == 0)
+ memory.resize(DBMS_DEFAULT_BUFFER_SIZE);
+
+ sub_buf.position() = position();
+ BufferBase::set(memory.data(), memory.size(), 0);
+ write_to_own_memory = true;
+ return;
+ }
+
+ sub_buf.position() = position();
+ sub_buf.next();
+ BufferBase::set(sub_buf.buffer().begin(), sub_buf.buffer().size(), sub_buf.offset());
+}
+
+
+void PeekableWriteBuffer::dropCheckpoint()
+{
+ assert(checkpoint);
+ checkpoint = std::nullopt;
+ /// If we have saved data in own memory, write it to sub-buf.
+ if (write_to_own_memory)
+ {
+ try
+ {
+ sub_buf.next();
+ sub_buf.write(memory.data(), position() - memory.data());
+ Buffer & sub_working = sub_buf.buffer();
+ BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
+ write_to_own_memory = false;
+ }
+ catch (...)
+ {
+ /// If exception happened during writing to sub buffer, we should
+ /// update buffer to not leave it in invalid state.
+ Buffer & sub_working = sub_buf.buffer();
+ BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
+ write_to_own_memory = false;
+ throw;
+ }
+ }
+
+}
+
+void PeekableWriteBuffer::rollbackToCheckpoint(bool drop)
+{
+ assert(checkpoint);
+
+ /// Just ignore all data written after checkpoint.
+ if (write_to_own_memory)
+ {
+ Buffer & sub_working = sub_buf.buffer();
+ BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
+ write_to_own_memory = false;
+ }
+
+ position() = *checkpoint;
+
+ if (drop)
+ checkpoint = std::nullopt;
+}
+
+}
diff --git a/contrib/clickhouse/src/IO/PeekableWriteBuffer.h b/contrib/clickhouse/src/IO/PeekableWriteBuffer.h
new file mode 100644
index 0000000000..e7094f11fc
--- /dev/null
+++ b/contrib/clickhouse/src/IO/PeekableWriteBuffer.h
@@ -0,0 +1,59 @@
+#pragma once
+#include <IO/WriteBuffer.h>
+#include <IO/BufferWithOwnMemory.h>
+#include <stack>
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int LOGICAL_ERROR;
+}
+
+/// Similar to PeekableReadBuffer.
+/// Allows to set checkpoint at some position in stream and come back to this position later.
+/// When next() is called, saves data between checkpoint and current position to own memory instead of writing it to sub-buffer.
+/// So, all the data after checkpoint won't be written in sub-buffer until checkpoint is dropped.
+/// Rollback to checkpoint means that all data after checkpoint will be ignored and not sent to sub-buffer.
+/// Sub-buffer should not be accessed directly during the lifetime of peekable buffer (unless
+/// you reset() the state of peekable buffer after each change of underlying buffer)
+/// If position() of peekable buffer is explicitly set to some position before checkpoint
+/// (e.g. by istr.position() = prev_pos), behavior is undefined.
+class PeekableWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
+{
+ friend class PeekableWriteBufferCheckpoint;
+public:
+ explicit PeekableWriteBuffer(WriteBuffer & sub_buf_);
+
+ /// Sets checkpoint at current position
+ ALWAYS_INLINE inline void setCheckpoint()
+ {
+ if (checkpoint)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "PeekableWriteBuffer does not support recursive checkpoints.");
+
+ checkpoint.emplace(pos);
+ }
+
+ /// Forget checkpoint and send all data from checkpoint to position to sub-buffer.
+ void dropCheckpoint();
+
+ /// Sets position at checkpoint and forget all data written from checkpoint to position.
+ /// All pointers (such as this->buffer().end()) may be invalidated
+ void rollbackToCheckpoint(bool drop = false);
+
+ void finalizeImpl() override
+ {
+ assert(!checkpoint);
+ sub_buf.position() = position();
+ }
+
+private:
+ void nextImpl() override;
+
+ WriteBuffer & sub_buf;
+ bool write_to_own_memory = false;
+ std::optional<Position> checkpoint = std::nullopt;
+};
+
+}
diff --git a/contrib/clickhouse/src/Interpreters/executeQuery.cpp b/contrib/clickhouse/src/Interpreters/executeQuery.cpp
index 32d1fd8507..498bb60b73 100644
--- a/contrib/clickhouse/src/Interpreters/executeQuery.cpp
+++ b/contrib/clickhouse/src/Interpreters/executeQuery.cpp
@@ -1261,7 +1261,8 @@ void executeQuery(
bool allow_into_outfile,
ContextMutablePtr context,
SetResultDetailsFunc set_result_details,
- const std::optional<FormatSettings> & output_format_settings)
+ const std::optional<FormatSettings> & output_format_settings,
+ HandleExceptionInOutputFormatFunc handle_exception_in_output_format)
{
PODArray<char> parse_buf;
const char * begin;
@@ -1319,6 +1320,7 @@ void executeQuery(
ASTPtr ast;
BlockIO streams;
+ OutputFormatPtr output_format;
std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, &istr);
auto & pipeline = streams.pipeline;
@@ -1361,30 +1363,30 @@ void executeQuery(
? getIdentifierName(ast_query_with_output->format)
: context->getDefaultFormat();
- auto out = FormatFactory::instance().getOutputFormatParallelIfPossible(
+ output_format = FormatFactory::instance().getOutputFormatParallelIfPossible(
format_name,
compressed_buffer ? *compressed_buffer : *out_buf,
materializeBlock(pipeline.getHeader()),
context,
output_format_settings);
- out->setAutoFlush();
+ output_format->setAutoFlush();
/// Save previous progress callback if any. TODO Do it more conveniently.
auto previous_progress_callback = context->getProgressCallback();
/// NOTE Progress callback takes shared ownership of 'out'.
- pipeline.setProgressCallback([out, previous_progress_callback] (const Progress & progress)
+ pipeline.setProgressCallback([output_format, previous_progress_callback] (const Progress & progress)
{
if (previous_progress_callback)
previous_progress_callback(progress);
- out->onProgress(progress);
+ output_format->onProgress(progress);
});
- result_details.content_type = out->getContentType();
+ result_details.content_type = output_format->getContentType();
result_details.format = format_name;
- pipeline.complete(std::move(out));
+ pipeline.complete(output_format);
}
else
{
@@ -1414,6 +1416,8 @@ void executeQuery(
}
catch (...)
{
+ if (handle_exception_in_output_format && output_format)
+ handle_exception_in_output_format(*output_format);
streams.onException();
throw;
}
diff --git a/contrib/clickhouse/src/Interpreters/executeQuery.h b/contrib/clickhouse/src/Interpreters/executeQuery.h
index c2c2e081b3..a31d4f2f08 100644
--- a/contrib/clickhouse/src/Interpreters/executeQuery.h
+++ b/contrib/clickhouse/src/Interpreters/executeQuery.h
@@ -15,6 +15,7 @@ namespace DB
class IInterpreter;
class ReadBuffer;
class WriteBuffer;
+class IOutputFormat;
struct QueryStatusInfo;
struct QueryResultDetails
@@ -26,6 +27,7 @@ struct QueryResultDetails
};
using SetResultDetailsFunc = std::function<void(const QueryResultDetails &)>;
+using HandleExceptionInOutputFormatFunc = std::function<void(IOutputFormat & output_format)>;
/// Parse and execute a query.
void executeQuery(
@@ -34,7 +36,8 @@ void executeQuery(
bool allow_into_outfile, /// If true and the query contains INTO OUTFILE section, redirect output to that file.
ContextMutablePtr context, /// DB, tables, data types, storage engines, functions, aggregate functions...
SetResultDetailsFunc set_result_details, /// If a non-empty callback is passed, it will be called with the query id, the content-type, the format, and the timezone.
- const std::optional<FormatSettings> & output_format_settings = std::nullopt /// Format settings for output format, will be calculated from the context if not set.
+ const std::optional<FormatSettings> & output_format_settings = std::nullopt, /// Format settings for output format, will be calculated from the context if not set.
+ HandleExceptionInOutputFormatFunc handle_exception_in_output_format = {} /// If a non-empty callback is passed, it will be called on exception with created output format.
);
diff --git a/contrib/clickhouse/src/Processors/Formats/IOutputFormat.h b/contrib/clickhouse/src/Processors/Formats/IOutputFormat.h
index 58700a978f..cae2ab7691 100644
--- a/contrib/clickhouse/src/Processors/Formats/IOutputFormat.h
+++ b/contrib/clickhouse/src/Processors/Formats/IOutputFormat.h
@@ -71,6 +71,9 @@ public:
consumeExtremes(Chunk(extremes.getColumns(), extremes.rows()));
}
+ virtual bool supportsWritingException() const { return false; }
+ virtual void setException(const String & /*exception_message*/) {}
+
size_t getResultRows() const { return result_rows; }
size_t getResultBytes() const { return result_bytes; }
@@ -162,6 +165,11 @@ protected:
/// outputs them in finalize() method.
virtual bool areTotalsAndExtremesUsedInFinalize() const { return false; }
+ /// Derived classes can use some wrappers around out WriteBuffer
+ /// and can override this method to return wrapper
+ /// that should be used in its derived classes.
+ virtual WriteBuffer * getWriteBufferPtr() { return &out; }
+
WriteBuffer & out;
Chunk current_chunk;
diff --git a/contrib/clickhouse/src/Processors/Formats/Impl/JSONColumnsBlockOutputFormatBase.cpp b/contrib/clickhouse/src/Processors/Formats/Impl/JSONColumnsBlockOutputFormatBase.cpp
index 490516b7eb..72a009c20b 100644
--- a/contrib/clickhouse/src/Processors/Formats/Impl/JSONColumnsBlockOutputFormatBase.cpp
+++ b/contrib/clickhouse/src/Processors/Formats/Impl/JSONColumnsBlockOutputFormatBase.cpp
@@ -9,10 +9,11 @@ namespace DB
JSONColumnsBlockOutputFormatBase::JSONColumnsBlockOutputFormatBase(
WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_, bool validate_utf8)
- : OutputFormatWithUTF8ValidationAdaptor(validate_utf8, header_, out_)
+ : OutputFormatWithUTF8ValidationAdaptor(header_, out_, validate_utf8)
, format_settings(format_settings_)
, serializations(header_.getSerializations())
{
+ ostr = OutputFormatWithUTF8ValidationAdaptor::getWriteBufferPtr();
}
void JSONColumnsBlockOutputFormatBase::consume(Chunk chunk)
diff --git a/contrib/clickhouse/src/Processors/Formats/Impl/JSONColumnsBlockOutputFormatBase.h b/contrib/clickhouse/src/Processors/Formats/Impl/JSONColumnsBlockOutputFormatBase.h
index 235a6d4da9..d73ac53b97 100644
--- a/contrib/clickhouse/src/Processors/Formats/Impl/JSONColumnsBlockOutputFormatBase.h
+++ b/contrib/clickhouse/src/Processors/Formats/Impl/JSONColumnsBlockOutputFormatBase.h
@@ -38,6 +38,7 @@ protected:
Chunk mono_chunk;
size_t written_rows = 0;
+ WriteBuffer * ostr;
};
}
diff --git a/contrib/clickhouse/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp b/contrib/clickhouse/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp
index 0cafc05346..530d09d5c8 100644
--- a/contrib/clickhouse/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp
+++ b/contrib/clickhouse/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp
@@ -15,12 +15,13 @@ JSONCompactEachRowRowOutputFormat::JSONCompactEachRowRowOutputFormat(WriteBuffer
bool with_names_,
bool with_types_,
bool yield_strings_)
- : RowOutputFormatWithUTF8ValidationAdaptor(settings_.json.validate_utf8, header_, out_)
+ : RowOutputFormatWithExceptionHandlerAdaptor<RowOutputFormatWithUTF8ValidationAdaptor, bool>(header_, out_, settings_.json.valid_output_on_exception, settings_.json.validate_utf8)
, settings(settings_)
, with_names(with_names_)
, with_types(with_types_)
, yield_strings(yield_strings_)
{
+ ostr = RowOutputFormatWithExceptionHandlerAdaptor::getWriteBufferPtr();
}
@@ -102,6 +103,25 @@ void JSONCompactEachRowRowOutputFormat::consumeTotals(DB::Chunk chunk)
IRowOutputFormat::consumeTotals(std::move(chunk));
}
+void JSONCompactEachRowRowOutputFormat::writeSuffix()
+{
+ if (!exception_message.empty())
+ {
+ if (haveWrittenData())
+ writeRowBetweenDelimiter();
+
+ writeRowStartDelimiter();
+ writeJSONString(exception_message, *ostr, settings);
+ writeRowEndDelimiter();
+ }
+}
+
+void JSONCompactEachRowRowOutputFormat::resetFormatterImpl()
+{
+ RowOutputFormatWithExceptionHandlerAdaptor::resetFormatterImpl();
+ ostr = RowOutputFormatWithExceptionHandlerAdaptor::getWriteBufferPtr();
+}
+
void registerOutputFormatJSONCompactEachRow(FormatFactory & factory)
{
for (bool yield_strings : {false, true})
diff --git a/contrib/clickhouse/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h b/contrib/clickhouse/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h
index 2be39669dd..bd32592a4a 100644
--- a/contrib/clickhouse/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h
+++ b/contrib/clickhouse/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h
@@ -3,15 +3,16 @@
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
#include <Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h>
+#include <Processors/Formats/RowOutputFormatWithExceptionHandlerAdaptor.h>
#include <Formats/FormatSettings.h>
namespace DB
{
-/** The stream for outputting data in JSON format, by object per line.
+/** The stream for outputting data in JSON format, by JSON array per line.
*/
-class JSONCompactEachRowRowOutputFormat final : public RowOutputFormatWithUTF8ValidationAdaptor
+class JSONCompactEachRowRowOutputFormat final : public RowOutputFormatWithExceptionHandlerAdaptor<RowOutputFormatWithUTF8ValidationAdaptor, bool>
{
public:
JSONCompactEachRowRowOutputFormat(
@@ -33,6 +34,9 @@ private:
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
+ void writeSuffix() override;
+
+ void resetFormatterImpl() override;
bool supportTotals() const override { return true; }
void consumeTotals(Chunk) override;
@@ -43,5 +47,7 @@ private:
bool with_names;
bool with_types;
bool yield_strings;
+
+ WriteBuffer * ostr;
};
}
diff --git a/contrib/clickhouse/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp b/contrib/clickhouse/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp
index 5b8f6cc1af..b805547201 100644
--- a/contrib/clickhouse/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp
+++ b/contrib/clickhouse/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp
@@ -14,10 +14,12 @@ JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(
const Block & header_,
const FormatSettings & settings_,
bool pretty_json_)
- : RowOutputFormatWithUTF8ValidationAdaptor(settings_.json.validate_utf8, header_, out_),
- pretty_json(pretty_json_),
- settings(settings_)
+ : RowOutputFormatWithExceptionHandlerAdaptor<RowOutputFormatWithUTF8ValidationAdaptor, bool>(
+ header_, out_, settings_.json.valid_output_on_exception, settings_.json.validate_utf8)
+ , pretty_json(pretty_json_)
+ , settings(settings_)
{
+ ostr = RowOutputFormatWithExceptionHandlerAdaptor::getWriteBufferPtr();
fields = JSONUtils::makeNamesValidJSONStrings(getPort(PortKind::Main).getHeader().getNames(), settings, settings.json.validate_utf8);
}
@@ -76,10 +78,24 @@ void JSONEachRowRowOutputFormat::writePrefix()
void JSONEachRowRowOutputFormat::writeSuffix()
{
+ if (!exception_message.empty())
+ {
+ if (haveWrittenData())
+ writeRowBetweenDelimiter();
+ writeRowStartDelimiter();
+ JSONUtils::writeException(exception_message, *ostr, settings, pretty_json ? 1 : 0);
+ writeRowEndDelimiter();
+ }
+
if (settings.json.array_of_rows)
writeCString("\n]\n", *ostr);
}
+void JSONEachRowRowOutputFormat::resetFormatterImpl()
+{
+ RowOutputFormatWithExceptionHandlerAdaptor::resetFormatterImpl();
+ ostr = RowOutputFormatWithExceptionHandlerAdaptor::getWriteBufferPtr();
+}
void registerOutputFormatJSONEachRow(FormatFactory & factory)
{
diff --git a/contrib/clickhouse/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h b/contrib/clickhouse/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h
index e05d189afe..2de9369846 100644
--- a/contrib/clickhouse/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h
+++ b/contrib/clickhouse/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h
@@ -2,7 +2,9 @@
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
+#include <IO/PeekableWriteBuffer.h>
#include <Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h>
+#include <Processors/Formats/RowOutputFormatWithExceptionHandlerAdaptor.h>
#include <Formats/FormatSettings.h>
@@ -11,7 +13,7 @@ namespace DB
/** The stream for outputting data in JSON format, by object per line.
*/
-class JSONEachRowRowOutputFormat : public RowOutputFormatWithUTF8ValidationAdaptor
+class JSONEachRowRowOutputFormat : public RowOutputFormatWithExceptionHandlerAdaptor<RowOutputFormatWithUTF8ValidationAdaptor, bool>
{
public:
JSONEachRowRowOutputFormat(
@@ -37,13 +39,16 @@ protected:
void writePrefix() override;
void writeSuffix() override;
+ void resetFormatterImpl() override;
+
size_t field_number = 0;
bool pretty_json;
+ FormatSettings settings;
+ WriteBuffer * ostr;
+
private:
Names fields;
-
- FormatSettings settings;
};
}
diff --git a/contrib/clickhouse/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.cpp b/contrib/clickhouse/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.cpp
index a02199d607..5d8e74309e 100644
--- a/contrib/clickhouse/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.cpp
+++ b/contrib/clickhouse/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.cpp
@@ -26,9 +26,10 @@ void JSONObjectEachRowRowOutputFormat::write(const Columns & columns, size_t row
if (field_index_for_object_name)
object_name = columns[*field_index_for_object_name]->getDataAt(row).toString();
else
- object_name = "row_" + std::to_string(row + 1);
+ object_name = "row_" + std::to_string(getRowsReadBefore() + rows + 1);
- IRowOutputFormat::write(columns, row);
+ ++rows;
+ RowOutputFormatWithExceptionHandlerAdaptor::write(columns, row);
}
void JSONObjectEachRowRowOutputFormat::writeFieldDelimiter()
@@ -62,6 +63,13 @@ void JSONObjectEachRowRowOutputFormat::writeRowBetweenDelimiter()
void JSONObjectEachRowRowOutputFormat::writeSuffix()
{
+ if (!exception_message.empty())
+ {
+ if (haveWrittenData())
+ writeRowBetweenDelimiter();
+ JSONUtils::writeException(exception_message, *ostr, settings, 1);
+ }
+
JSONUtils::writeObjectEnd(*ostr);
writeChar('\n', *ostr);
}
diff --git a/contrib/clickhouse/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.h b/contrib/clickhouse/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.h
index 1981931e91..d8ab2b09e6 100644
--- a/contrib/clickhouse/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.h
+++ b/contrib/clickhouse/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.h
@@ -40,6 +40,7 @@ private:
std::optional<size_t> field_index_for_object_name;
String object_name;
+ size_t rows = 0;
};
}
diff --git a/contrib/clickhouse/src/Processors/Formats/Impl/JSONRowOutputFormat.cpp b/contrib/clickhouse/src/Processors/Formats/Impl/JSONRowOutputFormat.cpp
index 0193ec7e3d..20182d8491 100644
--- a/contrib/clickhouse/src/Processors/Formats/Impl/JSONRowOutputFormat.cpp
+++ b/contrib/clickhouse/src/Processors/Formats/Impl/JSONRowOutputFormat.cpp
@@ -13,9 +13,10 @@ JSONRowOutputFormat::JSONRowOutputFormat(
const Block & header,
const FormatSettings & settings_,
bool yield_strings_)
- : RowOutputFormatWithUTF8ValidationAdaptor(true, header, out_), settings(settings_), yield_strings(yield_strings_)
+ : RowOutputFormatWithExceptionHandlerAdaptor<RowOutputFormatWithUTF8ValidationAdaptor, bool>(header, out_, settings_.json.valid_output_on_exception, true), settings(settings_), yield_strings(yield_strings_)
{
names = JSONUtils::makeNamesValidJSONStrings(header.getNames(), settings, true);
+ ostr = RowOutputFormatWithExceptionHandlerAdaptor::getWriteBufferPtr();
}
@@ -117,9 +118,15 @@ void JSONRowOutputFormat::finalizeImpl()
statistics.applied_limit,
statistics.watch,
statistics.progress,
- settings.write_statistics,
+ settings.write_statistics && exception_message.empty(),
*ostr);
+ if (!exception_message.empty())
+ {
+ writeCString(",\n\n", *ostr);
+ JSONUtils::writeException(exception_message, *ostr, settings, 1);
+ }
+
JSONUtils::writeObjectEnd(*ostr);
writeChar('\n', *ostr);
ostr->next();
@@ -127,7 +134,8 @@ void JSONRowOutputFormat::finalizeImpl()
void JSONRowOutputFormat::resetFormatterImpl()
{
- RowOutputFormatWithUTF8ValidationAdaptor::resetFormatterImpl();
+ RowOutputFormatWithExceptionHandlerAdaptor::resetFormatterImpl();
+ ostr = RowOutputFormatWithExceptionHandlerAdaptor::getWriteBufferPtr();
row_count = 0;
statistics = Statistics();
}
diff --git a/contrib/clickhouse/src/Processors/Formats/Impl/JSONRowOutputFormat.h b/contrib/clickhouse/src/Processors/Formats/Impl/JSONRowOutputFormat.h
index dc3f0541af..a38cd0e8db 100644
--- a/contrib/clickhouse/src/Processors/Formats/Impl/JSONRowOutputFormat.h
+++ b/contrib/clickhouse/src/Processors/Formats/Impl/JSONRowOutputFormat.h
@@ -3,8 +3,10 @@
#include <Core/Block.h>
#include <IO/Progress.h>
#include <IO/WriteBuffer.h>
+#include <IO/PeekableWriteBuffer.h>
#include <Common/Stopwatch.h>
#include <Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h>
+#include <Processors/Formats/RowOutputFormatWithExceptionHandlerAdaptor.h>
#include <Formats/FormatSettings.h>
@@ -13,7 +15,7 @@ namespace DB
/** Stream for output data in JSON format.
*/
-class JSONRowOutputFormat : public RowOutputFormatWithUTF8ValidationAdaptor
+class JSONRowOutputFormat : public RowOutputFormatWithExceptionHandlerAdaptor<RowOutputFormatWithUTF8ValidationAdaptor, bool>
{
public:
JSONRowOutputFormat(
@@ -69,6 +71,7 @@ protected:
FormatSettings settings;
bool yield_strings;
+ WriteBuffer * ostr;
};
}
diff --git a/contrib/clickhouse/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp b/contrib/clickhouse/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp
index 46fe2ba26a..b2871310be 100644
--- a/contrib/clickhouse/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp
+++ b/contrib/clickhouse/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp
@@ -8,7 +8,6 @@ namespace DB
void ParallelFormattingOutputFormat::finalizeImpl()
{
need_flush = true;
- IOutputFormat::finalized = true;
/// Don't throw any background_exception here, because we want to finalize the execution.
/// Exception will be checked after main thread is finished.
addChunk(Chunk{}, ProcessingUnitType::FINALIZE, /*can_throw_exception*/ false);
@@ -24,8 +23,33 @@ namespace DB
std::lock_guard lock(mutex);
if (background_exception)
- std::rethrow_exception(background_exception);
+ {
+ collector_finished.set();
+ rethrowBackgroundException();
+ }
}
+
+ /// The code below is required to write valid output in case of exception during parallel parsing,
+ /// because we finish formatting and collecting threads in case of exception.
+ /// So, in case of exception after finalize we could still not output prefix/suffix or finalize underlying format.
+
+ if (collected_prefix && collected_suffix && collected_finalize)
+ return;
+
+ auto formatter = internal_formatter_creator(out);
+ formatter->setRowsReadBefore(rows_collected);
+ formatter->setException(exception_message);
+
+ if (!collected_prefix && (need_write_prefix || started_prefix))
+ formatter->writePrefix();
+
+ if (!collected_suffix && (need_write_suffix || started_suffix))
+ formatter->writeSuffix();
+
+ if (!collected_finalize)
+ formatter->finalizeImpl();
+
+ formatter->finalizeBuffers();
}
void ParallelFormattingOutputFormat::addChunk(Chunk chunk, ProcessingUnitType type, bool can_throw_exception)
@@ -33,7 +57,7 @@ namespace DB
{
std::lock_guard lock(mutex);
if (background_exception && can_throw_exception)
- std::rethrow_exception(background_exception);
+ rethrowBackgroundException();
}
const auto current_unit_number = writer_unit_number % processing_units.size();
@@ -62,7 +86,10 @@ namespace DB
size_t first_row_num = rows_consumed;
if (unit.type == ProcessingUnitType::PLAIN)
+ {
rows_consumed += unit.chunk.getNumRows();
+ unit.rows_num = unit.chunk.getNumRows();
+ }
scheduleFormatterThreadForUnitWithNumber(current_unit_number, first_row_num);
++writer_unit_number;
@@ -125,7 +152,7 @@ namespace DB
assert(unit.status == READY_TO_READ);
/// Use this copy to after notification to stop the execution.
- auto copy_if_unit_type = unit.type;
+ auto copy_of_unit_type = unit.type;
/// Do main work here.
out.write(unit.segment.data(), unit.actual_memory_size);
@@ -134,6 +161,7 @@ namespace DB
IOutputFormat::flush();
++collector_unit_number;
+ rows_collected += unit.rows_num;
{
/// Notify other threads.
@@ -141,9 +169,19 @@ namespace DB
unit.status = READY_TO_INSERT;
writer_condvar.notify_all();
}
- /// We can exit only after writing last piece of to out buffer.
- if (copy_if_unit_type == ProcessingUnitType::FINALIZE)
+
+ if (copy_of_unit_type == ProcessingUnitType::START)
{
+ collected_prefix = true;
+ }
+ else if (copy_of_unit_type == ProcessingUnitType::PLAIN_FINISH)
+ {
+ collected_suffix = true;
+ }
+ /// We can exit only after writing last piece of data to out buffer.
+ else if (copy_of_unit_type == ProcessingUnitType::FINALIZE)
+ {
+ collected_finalize = true;
break;
}
}
@@ -156,7 +194,6 @@ namespace DB
}
}
-
void ParallelFormattingOutputFormat::formatterThreadFunction(size_t current_unit_number, size_t first_row_num, const ThreadGroupPtr & thread_group)
{
SCOPE_EXIT_SAFE(
@@ -184,6 +221,7 @@ namespace DB
auto formatter = internal_formatter_creator(out_buffer);
formatter->setRowsReadBefore(first_row_num);
+ formatter->setException(exception_message);
switch (unit.type)
{
diff --git a/contrib/clickhouse/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h b/contrib/clickhouse/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h
index 490f033b87..bf8968dd37 100644
--- a/contrib/clickhouse/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h
+++ b/contrib/clickhouse/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h
@@ -118,6 +118,7 @@ public:
void writePrefix() override
{
addChunk(Chunk{}, ProcessingUnitType::START, /*can_throw_exception*/ true);
+ started_prefix = true;
}
void onCancel() override
@@ -134,6 +135,7 @@ public:
void writeSuffix() override
{
addChunk(Chunk{}, ProcessingUnitType::PLAIN_FINISH, /*can_throw_exception*/ true);
+ started_suffix = true;
}
String getContentType() const override
@@ -142,6 +144,14 @@ public:
return internal_formatter_creator(buffer)->getContentType();
}
+ bool supportsWritingException() const override
+ {
+ WriteBufferFromOwnString buffer;
+ return internal_formatter_creator(buffer)->supportsWritingException();
+ }
+
+ void setException(const String & exception_message_) override { exception_message = exception_message_; }
+
private:
void consume(Chunk chunk) override final
{
@@ -214,6 +224,7 @@ private:
Memory<> segment;
size_t actual_memory_size{0};
Statistics statistics;
+ size_t rows_num;
};
Poco::Event collector_finished{};
@@ -241,12 +252,21 @@ private:
std::condition_variable writer_condvar;
size_t rows_consumed = 0;
+ size_t rows_collected = 0;
std::atomic_bool are_totals_written = false;
/// We change statistics in onProgress() which can be called from different threads.
std::mutex statistics_mutex;
bool save_totals_and_extremes_in_statistics;
+ String exception_message;
+ bool exception_is_rethrown = false;
+ bool started_prefix = false;
+ bool collected_prefix = false;
+ bool started_suffix = false;
+ bool collected_suffix = false;
+ bool collected_finalize = false;
+
void finishAndWait();
void onBackgroundException()
@@ -261,6 +281,17 @@ private:
collector_condvar.notify_all();
}
+ void rethrowBackgroundException()
+ {
+ /// Rethrow background exception only once, because
+ /// OutputFormat can be used after it to write an exception.
+ if (!exception_is_rethrown)
+ {
+ exception_is_rethrown = true;
+ std::rethrow_exception(background_exception);
+ }
+ }
+
void scheduleFormatterThreadForUnitWithNumber(size_t ticket_number, size_t first_row_num)
{
pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup(), ticket_number, first_row_num]
diff --git a/contrib/clickhouse/src/Processors/Formats/Impl/XMLRowOutputFormat.cpp b/contrib/clickhouse/src/Processors/Formats/Impl/XMLRowOutputFormat.cpp
index 1d6fb62275..52c161c320 100644
--- a/contrib/clickhouse/src/Processors/Formats/Impl/XMLRowOutputFormat.cpp
+++ b/contrib/clickhouse/src/Processors/Formats/Impl/XMLRowOutputFormat.cpp
@@ -8,8 +8,9 @@ namespace DB
{
XMLRowOutputFormat::XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
- : RowOutputFormatWithUTF8ValidationAdaptor(true, header_, out_), fields(header_.getNamesAndTypes()), format_settings(format_settings_)
+ : RowOutputFormatWithExceptionHandlerAdaptor<RowOutputFormatWithUTF8ValidationAdaptor, bool>(header_, out_, format_settings_.xml.valid_output_on_exception, true), fields(header_.getNamesAndTypes()), format_settings(format_settings_)
{
+ ostr = RowOutputFormatWithExceptionHandlerAdaptor::getWriteBufferPtr();
const auto & sample = getPort(PortKind::Main).getHeader();
field_tag_names.resize(sample.columns());
@@ -191,7 +192,9 @@ void XMLRowOutputFormat::finalizeImpl()
writeRowsBeforeLimitAtLeast();
- if (format_settings.write_statistics)
+ if (!exception_message.empty())
+ writeException();
+ else if (format_settings.write_statistics)
writeStatistics();
writeCString("</result>\n", *ostr);
@@ -200,7 +203,8 @@ void XMLRowOutputFormat::finalizeImpl()
void XMLRowOutputFormat::resetFormatterImpl()
{
- RowOutputFormatWithUTF8ValidationAdaptor::resetFormatterImpl();
+ RowOutputFormatWithExceptionHandlerAdaptor::resetFormatterImpl();
+ ostr = RowOutputFormatWithExceptionHandlerAdaptor::getWriteBufferPtr();
row_count = 0;
statistics = Statistics();
}
@@ -230,6 +234,12 @@ void XMLRowOutputFormat::writeStatistics()
writeCString("\t</statistics>\n", *ostr);
}
+void XMLRowOutputFormat::writeException()
+{
+ writeCString("\t<exception>", *ostr);
+ writeXMLStringForTextElement(exception_message, *ostr);
+ writeCString("</exception>\n", *ostr);
+}
void registerOutputFormatXML(FormatFactory & factory)
{
diff --git a/contrib/clickhouse/src/Processors/Formats/Impl/XMLRowOutputFormat.h b/contrib/clickhouse/src/Processors/Formats/Impl/XMLRowOutputFormat.h
index e25e712910..daf03539d0 100644
--- a/contrib/clickhouse/src/Processors/Formats/Impl/XMLRowOutputFormat.h
+++ b/contrib/clickhouse/src/Processors/Formats/Impl/XMLRowOutputFormat.h
@@ -6,6 +6,7 @@
#include <Common/Stopwatch.h>
#include <Formats/FormatSettings.h>
#include <Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h>
+#include <Processors/Formats/RowOutputFormatWithExceptionHandlerAdaptor.h>
namespace DB
@@ -13,7 +14,7 @@ namespace DB
/** A stream for outputting data in XML format.
*/
-class XMLRowOutputFormat final : public RowOutputFormatWithUTF8ValidationAdaptor
+class XMLRowOutputFormat final : public RowOutputFormatWithExceptionHandlerAdaptor<RowOutputFormatWithUTF8ValidationAdaptor, bool>
{
public:
XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
@@ -56,6 +57,7 @@ private:
void writeExtremesElement(const char * title, const Columns & columns, size_t row_num);
void writeRowsBeforeLimitAtLeast();
void writeStatistics();
+ void writeException();
size_t field_number = 0;
size_t row_count = 0;
@@ -63,6 +65,7 @@ private:
Names field_tag_names;
const FormatSettings format_settings;
+ WriteBuffer * ostr;
};
}
diff --git a/contrib/clickhouse/src/Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h b/contrib/clickhouse/src/Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h
index 8d8fb9ef0c..4c5c3ef72e 100644
--- a/contrib/clickhouse/src/Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h
+++ b/contrib/clickhouse/src/Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h
@@ -6,15 +6,17 @@
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferValidUTF8.h>
+#include <Common/logger_useful.h>
+
namespace DB
{
-template <typename Base, typename... Args>
+template <typename Base>
class OutputFormatWithUTF8ValidationAdaptorBase : public Base
{
public:
- OutputFormatWithUTF8ValidationAdaptorBase(bool validate_utf8, const Block & header, WriteBuffer & out_, Args... args)
- : Base(header, out_, std::forward<Args>(args)...)
+ OutputFormatWithUTF8ValidationAdaptorBase(const Block & header, WriteBuffer & out_, bool validate_utf8)
+ : Base(header, out_)
{
bool values_can_contain_invalid_utf8 = false;
for (const auto & type : this->getPort(IOutputFormat::PortKind::Main).getHeader().getDataTypes())
@@ -24,37 +26,39 @@ public:
}
if (validate_utf8 && values_can_contain_invalid_utf8)
- {
- validating_ostr = std::make_unique<WriteBufferValidUTF8>(this->out);
- ostr = validating_ostr.get();
- }
- else
- ostr = &this->out;
+ validating_ostr = std::make_unique<WriteBufferValidUTF8>(*Base::getWriteBufferPtr());
}
void flush() override
{
- ostr->next();
-
if (validating_ostr)
- this->out.next();
+ validating_ostr->next();
+ Base::flush();
}
void finalizeBuffers() override
{
if (validating_ostr)
validating_ostr->finalize();
+ Base::finalizeBuffers();
}
void resetFormatterImpl() override
{
- validating_ostr = std::make_unique<WriteBufferValidUTF8>(this->out);
- ostr = validating_ostr.get();
+ LOG_DEBUG(&Poco::Logger::get("RowOutputFormatWithExceptionHandlerAdaptor"), "resetFormatterImpl");
+ Base::resetFormatterImpl();
+ if (validating_ostr)
+ validating_ostr = std::make_unique<WriteBufferValidUTF8>(*Base::getWriteBufferPtr());
}
protected:
- /// Point to validating_ostr or out from IOutputFormat, should be used in derived classes instead of out.
- WriteBuffer * ostr;
+ /// Returns buffer that should be used in derived classes instead of out.
+ WriteBuffer * getWriteBufferPtr() override
+ {
+ if (validating_ostr)
+ return validating_ostr.get();
+ return Base::getWriteBufferPtr();
+ }
private:
/// Validates UTF-8 sequences, replaces bad sequences with replacement character.
diff --git a/contrib/clickhouse/src/Processors/Formats/RowOutputFormatWithExceptionHandlerAdaptor.h b/contrib/clickhouse/src/Processors/Formats/RowOutputFormatWithExceptionHandlerAdaptor.h
new file mode 100644
index 0000000000..e1426bb974
--- /dev/null
+++ b/contrib/clickhouse/src/Processors/Formats/RowOutputFormatWithExceptionHandlerAdaptor.h
@@ -0,0 +1,104 @@
+#pragma once
+
+#include <Processors/Formats/IOutputFormat.h>
+#include <Processors/Formats/IRowOutputFormat.h>
+#include <Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h>
+
+#include <IO/WriteBuffer.h>
+#include <IO/PeekableWriteBuffer.h>
+
+namespace DB
+{
+
+template <typename Base, typename... Args>
+class RowOutputFormatWithExceptionHandlerAdaptor : public Base
+{
+public:
+ RowOutputFormatWithExceptionHandlerAdaptor(const Block & header, WriteBuffer & out_, bool handle_exceptions, Args... args)
+ : Base(header, out_, std::forward<Args>(args)...)
+ {
+ if (handle_exceptions)
+ peekable_out = std::make_unique<PeekableWriteBuffer>(*Base::getWriteBufferPtr());
+ }
+
+ void consume(DB::Chunk chunk) override
+ {
+ if (!peekable_out)
+ {
+ Base::consume(std::move(chunk));
+ return;
+ }
+
+ auto num_rows = chunk.getNumRows();
+ const auto & columns = chunk.getColumns();
+
+ for (size_t row = 0; row < num_rows; ++row)
+ {
+ /// It's important to set a checkpoint before writing row-between delimiter
+ peekable_out->setCheckpoint();
+
+ if (Base::haveWrittenData())
+ writeRowBetweenDelimiter();
+
+ try
+ {
+ write(columns, row);
+ }
+ catch (...)
+ {
+ peekable_out->rollbackToCheckpoint(/*drop=*/true);
+ throw;
+ }
+ peekable_out->dropCheckpoint();
+
+ Base::first_row = false;
+ }
+ }
+
+ void write(const Columns & columns, size_t row_num) override { Base::write(columns, row_num); }
+ void writeRowBetweenDelimiter() override { Base::writeRowBetweenDelimiter(); }
+
+ void flush() override
+ {
+ if (peekable_out)
+ peekable_out->next();
+
+ Base::flush();
+ }
+
+ void finalizeBuffers() override
+ {
+ if (peekable_out)
+ peekable_out->finalize();
+ Base::finalizeBuffers();
+ }
+
+ void resetFormatterImpl() override
+ {
+ Base::resetFormatterImpl();
+ if (peekable_out)
+ peekable_out = std::make_unique<PeekableWriteBuffer>(*Base::getWriteBufferPtr());
+ }
+
+ bool supportsWritingException() const override { return true; }
+
+ void setException(const String & exception_message_) override { exception_message = exception_message_; }
+
+protected:
+ /// Returns buffer that should be used in derived classes instead of out.
+ WriteBuffer * getWriteBufferPtr() override
+ {
+ if (peekable_out)
+ return peekable_out.get();
+ return Base::getWriteBufferPtr();
+ }
+
+ String exception_message;
+
+private:
+
+ std::unique_ptr<PeekableWriteBuffer> peekable_out;
+};
+
+}
+
diff --git a/contrib/clickhouse/src/Server/HTTPHandler.cpp b/contrib/clickhouse/src/Server/HTTPHandler.cpp
index 41ed78bc69..a58f3f64e6 100644
--- a/contrib/clickhouse/src/Server/HTTPHandler.cpp
+++ b/contrib/clickhouse/src/Server/HTTPHandler.cpp
@@ -28,6 +28,8 @@
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <Parsers/ASTSetQuery.h>
+#include <Processors/Formats/IOutputFormat.h>
+#include <Formats/FormatFactory.h>
#include <base/getFQDNOrHostName.h>
#include <base/scope_guard.h>
@@ -833,23 +835,40 @@ void HTTPHandler::processQuery(
customizeContext(request, context, *in_post_maybe_compressed);
in = has_external_data ? std::move(in_param) : std::make_unique<ConcatReadBuffer>(*in_param, *in_post_maybe_compressed);
- executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
- [&response, this] (const QueryResultDetails & details)
- {
- response.add("X-ClickHouse-Query-Id", details.query_id);
+ auto set_query_result = [&response, this] (const QueryResultDetails & details)
+ {
+ response.add("X-ClickHouse-Query-Id", details.query_id);
- if (content_type_override)
- response.setContentType(*content_type_override);
- else if (details.content_type)
- response.setContentType(*details.content_type);
+ if (content_type_override)
+ response.setContentType(*content_type_override);
+ else if (details.content_type)
+ response.setContentType(*details.content_type);
- if (details.format)
- response.add("X-ClickHouse-Format", *details.format);
+ if (details.format)
+ response.add("X-ClickHouse-Format", *details.format);
+
+ if (details.timezone)
+ response.add("X-ClickHouse-Timezone", *details.timezone);
+ };
- if (details.timezone)
- response.add("X-ClickHouse-Timezone", *details.timezone);
+ auto handle_exception_in_output_format = [&](IOutputFormat & output_format)
+ {
+ if (settings.http_write_exception_in_output_format && output_format.supportsWritingException())
+ {
+ output_format.setException(getCurrentExceptionMessage(false));
+ output_format.finalize();
+ used_output.exception_is_written = true;
}
- );
+ };
+
+ executeQuery(
+ *in,
+ *used_output.out_maybe_delayed_and_compressed,
+ /* allow_into_outfile = */ false,
+ context,
+ set_query_result,
+ {},
+ handle_exception_in_output_format);
if (used_output.hasDelayed())
{
@@ -893,7 +912,7 @@ try
response.setStatusAndReason(exceptionCodeToHTTPStatus(exception_code));
}
- if (!response.sent() && !used_output.out_maybe_compressed)
+ if (!response.sent() && !used_output.out_maybe_compressed && !used_output.exception_is_written)
{
/// If nothing was sent yet and we don't even know if we must compress the response.
*response.send() << s << std::endl;
@@ -909,21 +928,24 @@ try
used_output.out_maybe_delayed_and_compressed.reset();
}
- /// Send the error message into already used (and possibly compressed) stream.
- /// Note that the error message will possibly be sent after some data.
- /// Also HTTP code 200 could have already been sent.
+ if (!used_output.exception_is_written)
+ {
+ /// Send the error message into already used (and possibly compressed) stream.
+ /// Note that the error message will possibly be sent after some data.
+ /// Also HTTP code 200 could have already been sent.
- /// If buffer has data, and that data wasn't sent yet, then no need to send that data
- bool data_sent = used_output.out->count() != used_output.out->offset();
+ /// If buffer has data, and that data wasn't sent yet, then no need to send that data
+ bool data_sent = used_output.out->count() != used_output.out->offset();
- if (!data_sent)
- {
- used_output.out_maybe_compressed->position() = used_output.out_maybe_compressed->buffer().begin();
- used_output.out->position() = used_output.out->buffer().begin();
- }
+ if (!data_sent)
+ {
+ used_output.out_maybe_compressed->position() = used_output.out_maybe_compressed->buffer().begin();
+ used_output.out->position() = used_output.out->buffer().begin();
+ }
- writeString(s, *used_output.out_maybe_compressed);
- writeChar('\n', *used_output.out_maybe_compressed);
+ writeString(s, *used_output.out_maybe_compressed);
+ writeChar('\n', *used_output.out_maybe_compressed);
+ }
used_output.out_maybe_compressed->next();
}
diff --git a/contrib/clickhouse/src/Server/HTTPHandler.h b/contrib/clickhouse/src/Server/HTTPHandler.h
index 5eda592753..94b5a44f10 100644
--- a/contrib/clickhouse/src/Server/HTTPHandler.h
+++ b/contrib/clickhouse/src/Server/HTTPHandler.h
@@ -62,6 +62,8 @@ private:
bool finalized = false;
+ bool exception_is_written = false;
+
inline bool hasDelayed() const
{
return out_maybe_delayed_and_compressed != out_maybe_compressed;