aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-07-19 19:43:21 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-07-19 19:43:21 +0300
commit3e8c90d237c6c4e794959a21f3b4c5ecb59fff64 (patch)
tree288eb148cf7e61b46e15d7273f47ef6bed0e6ae7
parent64735435ab95ae8e9622d9016f4db18ea60459c0 (diff)
downloadydb-3e8c90d237c6c4e794959a21f3b4c5ecb59fff64.tar.gz
Import more output formats.
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt4
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp8
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp89
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowOutputFormat.h48
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp102
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h49
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowOutputFormat.cpp55
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowOutputFormat.h29
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRawRowOutputFormat.h35
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.cpp131
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.h49
11 files changed, 599 insertions, 0 deletions
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt b/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt
index 489a1921907..11264877585 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt
+++ b/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt
@@ -435,12 +435,16 @@ target_sources(clickhouse_client_udf.global PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/AvroRowInputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowInputFormat.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/RawBLOBRowInputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowOutputFormat.cpp
)
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp
index 9dc0db62432..cd62297e8fd 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp
@@ -16,10 +16,14 @@ void registerInputFormatProcessorRawBLOB(FormatFactory & factory);
void registerInputFormatProcessorORC(FormatFactory & factory);
void registerInputFormatProcessorArrow(FormatFactory & factory);
void registerInputFormatProcessorParquet(FormatFactory & factory);
+void registerOutputFormatProcessorParquet(FormatFactory & factory);
void registerInputFormatProcessorAvro(FormatFactory & factory);
void registerInputFormatProcessorCSV(FormatFactory & factory);
+void registerOutputFormatProcessorCSV(FormatFactory & factory);
void registerInputFormatProcessorTSKV(FormatFactory & factory);
+void registerOutputFormatProcessorTSKV(FormatFactory & factory);
void registerInputFormatProcessorTabSeparated(FormatFactory & factory);
+void registerOutputFormatProcessorTabSeparated(FormatFactory & factory);
void registerFormats()
{
@@ -35,10 +39,14 @@ void registerFormats()
registerInputFormatProcessorORC(factory);
registerInputFormatProcessorArrow(factory);
registerInputFormatProcessorParquet(factory);
+ registerOutputFormatProcessorParquet(factory);
registerInputFormatProcessorAvro(factory);
registerInputFormatProcessorCSV(factory);
+ registerOutputFormatProcessorCSV(factory);
registerInputFormatProcessorTSKV(factory);
+ registerOutputFormatProcessorTSKV(factory);
registerInputFormatProcessorTabSeparated(factory);
+ registerOutputFormatProcessorTabSeparated(factory);
}
}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp
new file mode 100644
index 00000000000..4957af927e3
--- /dev/null
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp
@@ -0,0 +1,89 @@
+#include <Processors/Formats/Impl/CSVRowOutputFormat.h>
+#include <Formats/FormatFactory.h>
+
+#include <IO/WriteHelpers.h>
+
+
+namespace NDB
+{
+
+
+CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
+ : IRowOutputFormat(header_, out_, params_), with_names(with_names_), format_settings(format_settings_)
+{
+ const auto & sample = getPort(PortKind::Main).getHeader();
+ size_t columns = sample.columns();
+ data_types.resize(columns);
+ for (size_t i = 0; i < columns; ++i)
+ data_types[i] = sample.safeGetByPosition(i).type;
+}
+
+
+void CSVRowOutputFormat::doWritePrefix()
+{
+ const auto & sample = getPort(PortKind::Main).getHeader();
+ size_t columns = sample.columns();
+
+ if (with_names)
+ {
+ for (size_t i = 0; i < columns; ++i)
+ {
+ writeCSVString(sample.safeGetByPosition(i).name, out);
+
+ char delimiter = format_settings.csv.delimiter;
+ if (i + 1 == columns)
+ delimiter = '\n';
+
+ writeChar(delimiter, out);
+ }
+ }
+}
+
+
+void CSVRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num)
+{
+ serialization.serializeTextCSV(column, row_num, out, format_settings);
+}
+
+
+void CSVRowOutputFormat::writeFieldDelimiter()
+{
+ writeChar(format_settings.csv.delimiter, out);
+}
+
+
+void CSVRowOutputFormat::writeRowEndDelimiter()
+{
+ if (format_settings.csv.crlf_end_of_line)
+ writeChar('\r', out);
+ writeChar('\n', out);
+}
+
+void CSVRowOutputFormat::writeBeforeTotals()
+{
+ writeChar('\n', out);
+}
+
+void CSVRowOutputFormat::writeBeforeExtremes()
+{
+ writeChar('\n', out);
+}
+
+
+void registerOutputFormatProcessorCSV(FormatFactory & factory)
+{
+ for (bool with_names : {false, true})
+ {
+ factory.registerOutputFormatProcessor(with_names ? "csv_with_names" : "CSV", [=](
+ WriteBuffer & buf,
+ const Block & sample,
+ const RowOutputFormatParams & params,
+ const FormatSettings & format_settings)
+ {
+ return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, params, format_settings);
+ });
+ factory.markOutputFormatSupportsParallelFormatting(with_names ? "csv_with_names" : "CSV");
+ }
+}
+
+}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowOutputFormat.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowOutputFormat.h
new file mode 100644
index 00000000000..e628bfc3d11
--- /dev/null
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowOutputFormat.h
@@ -0,0 +1,48 @@
+#pragma once
+
+#include <Core/Block.h>
+#include <Processors/Formats/IRowOutputFormat.h>
+#include <Formats/FormatSettings.h>
+
+
+namespace NDB
+{
+
+class WriteBuffer;
+
+
+/** The stream for outputting data in csv format.
+ * Does not conform with https://tools.ietf.org/html/rfc4180 because it uses LF, not CR LF.
+ */
+class CSVRowOutputFormat : public IRowOutputFormat
+{
+public:
+ /** with_names - output in the first line a header with column names
+ * with_types - output in the next line header with the names of the types
+ */
+ CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
+
+ String getName() const override { return "CSVRowOutputFormat"; }
+
+ void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
+ void writeFieldDelimiter() override;
+ void writeRowEndDelimiter() override;
+ void writeBeforeTotals() override;
+ void writeBeforeExtremes() override;
+
+ void doWritePrefix() override;
+
+ /// https://www.iana.org/assignments/media-types/text/csv
+ String getContentType() const override
+ {
+ return String("text/csv; charset=UTF-8; header=") + (with_names ? "present" : "absent");
+ }
+
+protected:
+
+ bool with_names;
+ const FormatSettings format_settings;
+ DataTypes data_types;
+};
+
+}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp
new file mode 100644
index 00000000000..e5c1158ed74
--- /dev/null
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp
@@ -0,0 +1,102 @@
+#include "ParquetBlockOutputFormat.h"
+
+#if USE_PARQUET
+
+#include <Formats/FormatFactory.h>
+#include <parquet/arrow/writer.h>
+#include "ArrowBufferedStreams.h"
+#include "CHColumnToArrowColumn.h"
+
+
+namespace NDB
+{
+
+namespace ErrorCodes
+{
+ extern const int UNKNOWN_EXCEPTION;
+}
+
+ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
+ : IOutputFormat(header_, out_), format_settings{format_settings_}
+{
+}
+
+void ParquetBlockOutputFormat::consume(Chunk chunk)
+{
+ const size_t columns_num = chunk.getNumColumns();
+ std::shared_ptr<arrow::Table> arrow_table;
+
+ if (!ch_column_to_arrow_column)
+ {
+ const Block & header = getPort(PortKind::Main).getHeader();
+ ch_column_to_arrow_column = std::make_unique<CHColumnToArrowColumn>(header, "Parquet", false);
+ }
+
+ ch_column_to_arrow_column->chChunkToArrowTable(arrow_table, chunk, columns_num);
+
+ if (!file_writer)
+ {
+ auto sink = std::make_shared<ArrowBufferedOutputStream>(out);
+
+ parquet::WriterProperties::Builder builder;
+#if USE_SNAPPY
+ builder.compression(parquet::Compression::SNAPPY);
+#endif
+ auto props = builder.build();
+ auto status = parquet::arrow::FileWriter::Open(
+ *arrow_table->schema(),
+ arrow::default_memory_pool(),
+ sink,
+ props, /*parquet::default_writer_properties(),*/
+ &file_writer);
+ if (!status.ok())
+ throw Exception{"Error while opening a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
+ }
+
+ // TODO: calculate row_group_size depending on a number of rows and table size
+ auto status = file_writer->WriteTable(*arrow_table, format_settings.parquet.row_group_size);
+
+ if (!status.ok())
+ throw Exception{"Error while writing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
+}
+
+void ParquetBlockOutputFormat::finalize()
+{
+ if (!file_writer)
+ {
+ const Block & header = getPort(PortKind::Main).getHeader();
+
+ consume(Chunk(header.getColumns(), 0));
+ }
+
+ auto status = file_writer->Close();
+ if (!status.ok())
+ throw Exception{"Error while closing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
+}
+
+void registerOutputFormatProcessorParquet(FormatFactory & factory)
+{
+ factory.registerOutputFormatProcessor(
+ "parquet",
+ [](WriteBuffer & buf,
+ const Block & sample,
+ const RowOutputFormatParams &,
+ const FormatSettings & format_settings)
+ {
+ return std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings);
+ });
+}
+
+}
+
+#else
+
+namespace DB
+{
+class FormatFactory;
+void registerOutputFormatProcessorParquet(FormatFactory &)
+{
+}
+}
+
+#endif
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h
new file mode 100644
index 00000000000..6ae69edbd6d
--- /dev/null
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h
@@ -0,0 +1,49 @@
+#pragma once
+
+//#if !defined(ARCADIA_BUILD)
+//# include "config_formats.h"
+//#endif
+#if USE_PARQUET
+# include <Processors/Formats/IOutputFormat.h>
+# include <Formats/FormatSettings.h>
+
+namespace arrow
+{
+class Array;
+class DataType;
+}
+
+namespace parquet
+{
+namespace arrow
+{
+ class FileWriter;
+}
+}
+
+namespace NDB
+{
+
+class CHColumnToArrowColumn;
+
+class ParquetBlockOutputFormat : public IOutputFormat
+{
+public:
+ ParquetBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
+
+ String getName() const override { return "ParquetBlockOutputFormat"; }
+ void consume(Chunk) override;
+ void finalize() override;
+
+ String getContentType() const override { return "application/octet-stream"; }
+
+private:
+ const FormatSettings format_settings;
+
+ std::unique_ptr<parquet::arrow::FileWriter> file_writer;
+ std::unique_ptr<CHColumnToArrowColumn> ch_column_to_arrow_column;
+};
+
+}
+
+#endif
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowOutputFormat.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowOutputFormat.cpp
new file mode 100644
index 00000000000..5e3bdb9d54a
--- /dev/null
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowOutputFormat.cpp
@@ -0,0 +1,55 @@
+#include <IO/WriteHelpers.h>
+#include <IO/WriteBufferFromString.h>
+#include <Processors/Formats/Impl/TSKVRowOutputFormat.h>
+#include <Formats/FormatFactory.h>
+
+
+namespace NDB
+{
+
+TSKVRowOutputFormat::TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
+ : TabSeparatedRowOutputFormat(out_, header, false, false, params_, format_settings_)
+{
+ const auto & sample = getPort(PortKind::Main).getHeader();
+ NamesAndTypesList columns(sample.getNamesAndTypesList());
+ fields.assign(columns.begin(), columns.end());
+
+ for (auto & field : fields)
+ {
+ WriteBufferFromOwnString wb;
+ writeAnyEscapedString<'='>(field.name.data(), field.name.data() + field.name.size(), wb);
+ writeCString("=", wb);
+ field.name = wb.str();
+ }
+}
+
+
+void TSKVRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num)
+{
+ writeString(fields[field_number].name, out);
+ serialization.serializeTextEscaped(column, row_num, out, format_settings);
+ ++field_number;
+}
+
+
+void TSKVRowOutputFormat::writeRowEndDelimiter()
+{
+ writeChar('\n', out);
+ field_number = 0;
+}
+
+
+void registerOutputFormatProcessorTSKV(FormatFactory & factory)
+{
+ factory.registerOutputFormatProcessor("TSKV", [](
+ WriteBuffer & buf,
+ const Block & sample,
+ const RowOutputFormatParams & params,
+ const FormatSettings & settings)
+ {
+ return std::make_shared<TSKVRowOutputFormat>(buf, sample, params, settings);
+ });
+ factory.markOutputFormatSupportsParallelFormatting("TSKV");
+}
+
+}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowOutputFormat.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowOutputFormat.h
new file mode 100644
index 00000000000..b72934d9ccb
--- /dev/null
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowOutputFormat.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include <Formats/FormatSettings.h>
+#include <Processors/Formats/Impl/TabSeparatedRowOutputFormat.h>
+
+
+namespace NDB
+{
+
+/** The stream for outputting data in the TSKV format.
+ * TSKV is similar to TabSeparated, but before every value, its name and equal sign are specified: name=value.
+ * This format is very inefficient.
+ */
+class TSKVRowOutputFormat: public TabSeparatedRowOutputFormat
+{
+public:
+ TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const RowOutputFormatParams & params_, const FormatSettings & format_settings);
+
+ String getName() const override { return "TSKVRowOutputFormat"; }
+
+ void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
+ void writeRowEndDelimiter() override;
+
+protected:
+ NamesAndTypes fields;
+ size_t field_number = 0;
+};
+
+}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRawRowOutputFormat.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRawRowOutputFormat.h
new file mode 100644
index 00000000000..1a7caadf034
--- /dev/null
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRawRowOutputFormat.h
@@ -0,0 +1,35 @@
+#pragma once
+
+#include <Formats/FormatSettings.h>
+#include <Processors/Formats/Impl/TabSeparatedRowOutputFormat.h>
+
+
+namespace NDB
+{
+
+/** A stream for outputting data in tsv format, but without escaping individual values.
+ * (That is, the output is irreversible.)
+ */
+class TabSeparatedRawRowOutputFormat : public TabSeparatedRowOutputFormat
+{
+public:
+ TabSeparatedRawRowOutputFormat(
+ WriteBuffer & out_,
+ const Block & header_,
+ bool with_names_,
+ bool with_types_,
+ const RowOutputFormatParams & params_,
+ const FormatSettings & format_settings_)
+ : TabSeparatedRowOutputFormat(out_, header_, with_names_, with_types_, params_, format_settings_)
+ {
+ }
+
+ String getName() const override { return "TabSeparatedRawRowOutputFormat"; }
+
+ void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override
+ {
+ serialization.serializeText(column, row_num, out, format_settings);
+ }
+};
+
+}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.cpp
new file mode 100644
index 00000000000..11c93845f43
--- /dev/null
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.cpp
@@ -0,0 +1,131 @@
+#include <Processors/Formats/Impl/TabSeparatedRowOutputFormat.h>
+#include <Processors/Formats/Impl/TabSeparatedRawRowOutputFormat.h>
+#include <Formats/FormatFactory.h>
+#include <IO/WriteHelpers.h>
+
+
+namespace NDB
+{
+TabSeparatedRowOutputFormat::TabSeparatedRowOutputFormat(
+ WriteBuffer & out_,
+ const Block & header_,
+ bool with_names_,
+ bool with_types_,
+ const RowOutputFormatParams & params_,
+ const FormatSettings & format_settings_)
+ : IRowOutputFormat(header_, out_, params_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
+{
+}
+
+
+void TabSeparatedRowOutputFormat::doWritePrefix()
+{
+ const auto & header = getPort(PortKind::Main).getHeader();
+ size_t columns = header.columns();
+
+ if (with_names)
+ {
+ for (size_t i = 0; i < columns; ++i)
+ {
+ writeEscapedString(header.safeGetByPosition(i).name, out);
+ writeChar(i == columns - 1 ? '\n' : '\t', out);
+ }
+ }
+
+ if (with_types)
+ {
+ for (size_t i = 0; i < columns; ++i)
+ {
+ writeEscapedString(header.safeGetByPosition(i).type->getName(), out);
+ writeChar(i == columns - 1 ? '\n' : '\t', out);
+ }
+ }
+}
+
+
+void TabSeparatedRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num)
+{
+ serialization.serializeTextEscaped(column, row_num, out, format_settings);
+}
+
+
+void TabSeparatedRowOutputFormat::writeFieldDelimiter()
+{
+ writeChar('\t', out);
+}
+
+
+void TabSeparatedRowOutputFormat::writeRowEndDelimiter()
+{
+ if (format_settings.tsv.crlf_end_of_line)
+ writeChar('\r', out);
+ writeChar('\n', out);
+}
+
+void TabSeparatedRowOutputFormat::writeBeforeTotals()
+{
+ writeChar('\n', out);
+}
+
+void TabSeparatedRowOutputFormat::writeBeforeExtremes()
+{
+ writeChar('\n', out);
+}
+
+
+void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
+{
+ for (const auto * name : {"TabSeparated", "TSV"})
+ {
+ factory.registerOutputFormatProcessor(name, [](
+ WriteBuffer & buf,
+ const Block & sample,
+ const RowOutputFormatParams & params,
+ const FormatSettings & settings)
+ {
+ return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, false, false, params, settings);
+ });
+ factory.markOutputFormatSupportsParallelFormatting(name);
+ }
+
+ for (const auto * name : {"TabSeparatedRaw", "TSVRaw"})
+ {
+ factory.registerOutputFormatProcessor(name, [](
+ WriteBuffer & buf,
+ const Block & sample,
+ const RowOutputFormatParams & params,
+ const FormatSettings & settings)
+ {
+ return std::make_shared<TabSeparatedRawRowOutputFormat>(buf, sample, false, false, params, settings);
+ });
+ factory.markOutputFormatSupportsParallelFormatting(name);
+ }
+
+ for (const auto * name : {"TabSeparatedWithNames", "tsv_with_names"})
+ {
+ factory.registerOutputFormatProcessor(name, [](
+ WriteBuffer & buf,
+ const Block & sample,
+ const RowOutputFormatParams & params,
+ const FormatSettings & settings)
+ {
+ return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, false, params, settings);
+ });
+ factory.markOutputFormatSupportsParallelFormatting(name);
+ }
+
+ for (const auto * name : {"TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes"})
+ {
+ factory.registerOutputFormatProcessor(name, [](
+ WriteBuffer & buf,
+ const Block & sample,
+ const RowOutputFormatParams & params,
+ const FormatSettings & settings)
+ {
+ return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, true, params, settings);
+ });
+ factory.markOutputFormatSupportsParallelFormatting(name);
+ }
+}
+
+}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.h
new file mode 100644
index 00000000000..05eb0a36262
--- /dev/null
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.h
@@ -0,0 +1,49 @@
+#pragma once
+
+#include <Core/Block.h>
+#include <Formats/FormatSettings.h>
+#include <Processors/Formats/IRowOutputFormat.h>
+
+
+namespace NDB
+{
+
+class WriteBuffer;
+
+/** A stream for outputting data in tsv format.
+ */
+class TabSeparatedRowOutputFormat : public IRowOutputFormat
+{
+public:
+ /** with_names - output in the first line a header with column names
+ * with_types - output the next line header with the names of the types
+ */
+ TabSeparatedRowOutputFormat(
+ WriteBuffer & out_,
+ const Block & header_,
+ bool with_names_,
+ bool with_types_,
+ const RowOutputFormatParams & params_,
+ const FormatSettings & format_settings_);
+
+ String getName() const override { return "TabSeparatedRowOutputFormat"; }
+
+ void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
+ void writeFieldDelimiter() override;
+ void writeRowEndDelimiter() override;
+ void writeBeforeTotals() override;
+ void writeBeforeExtremes() override;
+
+ void doWritePrefix() override;
+
+ /// https://www.iana.org/assignments/media-types/text/tab-separated-values
+ String getContentType() const override { return "text/tab-separated-values; charset=UTF-8"; }
+
+protected:
+
+ bool with_names;
+ bool with_types;
+ const FormatSettings format_settings;
+};
+
+}