diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-07-19 19:43:21 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-07-19 19:43:21 +0300 |
commit | 3e8c90d237c6c4e794959a21f3b4c5ecb59fff64 (patch) | |
tree | 288eb148cf7e61b46e15d7273f47ef6bed0e6ae7 | |
parent | 64735435ab95ae8e9622d9016f4db18ea60459c0 (diff) | |
download | ydb-3e8c90d237c6c4e794959a21f3b4c5ecb59fff64.tar.gz |
Import more output formats.
11 files changed, 599 insertions, 0 deletions
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt b/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt index 489a1921907..11264877585 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt +++ b/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt @@ -435,12 +435,16 @@ target_sources(clickhouse_client_udf.global PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/AvroRowInputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowInputFormat.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/RawBLOBRowInputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowOutputFormat.cpp ) diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp index 9dc0db62432..cd62297e8fd 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp @@ -16,10 +16,14 @@ void registerInputFormatProcessorRawBLOB(FormatFactory & factory); void registerInputFormatProcessorORC(FormatFactory & factory); void registerInputFormatProcessorArrow(FormatFactory & factory); void registerInputFormatProcessorParquet(FormatFactory & factory); +void registerOutputFormatProcessorParquet(FormatFactory & factory); void registerInputFormatProcessorAvro(FormatFactory & factory); void registerInputFormatProcessorCSV(FormatFactory & factory); +void registerOutputFormatProcessorCSV(FormatFactory & factory); void registerInputFormatProcessorTSKV(FormatFactory & factory); +void registerOutputFormatProcessorTSKV(FormatFactory & factory); void registerInputFormatProcessorTabSeparated(FormatFactory & factory); +void registerOutputFormatProcessorTabSeparated(FormatFactory & factory); void registerFormats() { @@ -35,10 +39,14 @@ void registerFormats() registerInputFormatProcessorORC(factory); registerInputFormatProcessorArrow(factory); registerInputFormatProcessorParquet(factory); + registerOutputFormatProcessorParquet(factory); registerInputFormatProcessorAvro(factory); registerInputFormatProcessorCSV(factory); + registerOutputFormatProcessorCSV(factory); registerInputFormatProcessorTSKV(factory); + registerOutputFormatProcessorTSKV(factory); registerInputFormatProcessorTabSeparated(factory); + registerOutputFormatProcessorTabSeparated(factory); } } diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp new file mode 100644 index 00000000000..4957af927e3 --- /dev/null +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp @@ -0,0 +1,89 @@ +#include <Processors/Formats/Impl/CSVRowOutputFormat.h> +#include <Formats/FormatFactory.h> + +#include <IO/WriteHelpers.h> + + +namespace NDB +{ + + +CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_) + : IRowOutputFormat(header_, out_, params_), with_names(with_names_), format_settings(format_settings_) +{ + const auto & sample = getPort(PortKind::Main).getHeader(); + size_t columns = sample.columns(); + data_types.resize(columns); + for (size_t i = 0; i < columns; ++i) + data_types[i] = sample.safeGetByPosition(i).type; +} + + +void CSVRowOutputFormat::doWritePrefix() +{ + const auto & sample = getPort(PortKind::Main).getHeader(); + size_t columns = sample.columns(); + + if (with_names) + { + for (size_t i = 0; i < columns; ++i) + { + writeCSVString(sample.safeGetByPosition(i).name, out); + + char delimiter = format_settings.csv.delimiter; + if (i + 1 == columns) + delimiter = '\n'; + + writeChar(delimiter, out); + } + } +} + + +void CSVRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) +{ + serialization.serializeTextCSV(column, row_num, out, format_settings); +} + + +void CSVRowOutputFormat::writeFieldDelimiter() +{ + writeChar(format_settings.csv.delimiter, out); +} + + +void CSVRowOutputFormat::writeRowEndDelimiter() +{ + if (format_settings.csv.crlf_end_of_line) + writeChar('\r', out); + writeChar('\n', out); +} + +void CSVRowOutputFormat::writeBeforeTotals() +{ + writeChar('\n', out); +} + +void CSVRowOutputFormat::writeBeforeExtremes() +{ + writeChar('\n', out); +} + + +void registerOutputFormatProcessorCSV(FormatFactory & factory) +{ + for (bool with_names : {false, true}) + { + factory.registerOutputFormatProcessor(with_names ? "csv_with_names" : "CSV", [=]( + WriteBuffer & buf, + const Block & sample, + const RowOutputFormatParams & params, + const FormatSettings & format_settings) + { + return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, params, format_settings); + }); + factory.markOutputFormatSupportsParallelFormatting(with_names ? "csv_with_names" : "CSV"); + } +} + +} diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowOutputFormat.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowOutputFormat.h new file mode 100644 index 00000000000..e628bfc3d11 --- /dev/null +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowOutputFormat.h @@ -0,0 +1,48 @@ +#pragma once + +#include <Core/Block.h> +#include <Processors/Formats/IRowOutputFormat.h> +#include <Formats/FormatSettings.h> + + +namespace NDB +{ + +class WriteBuffer; + + +/** The stream for outputting data in csv format. + * Does not conform with https://tools.ietf.org/html/rfc4180 because it uses LF, not CR LF. + */ +class CSVRowOutputFormat : public IRowOutputFormat +{ +public: + /** with_names - output in the first line a header with column names + * with_types - output in the next line header with the names of the types + */ + CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_); + + String getName() const override { return "CSVRowOutputFormat"; } + + void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override; + void writeFieldDelimiter() override; + void writeRowEndDelimiter() override; + void writeBeforeTotals() override; + void writeBeforeExtremes() override; + + void doWritePrefix() override; + + /// https://www.iana.org/assignments/media-types/text/csv + String getContentType() const override + { + return String("text/csv; charset=UTF-8; header=") + (with_names ? "present" : "absent"); + } + +protected: + + bool with_names; + const FormatSettings format_settings; + DataTypes data_types; +}; + +} diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp new file mode 100644 index 00000000000..e5c1158ed74 --- /dev/null +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp @@ -0,0 +1,102 @@ +#include "ParquetBlockOutputFormat.h" + +#if USE_PARQUET + +#include <Formats/FormatFactory.h> +#include <parquet/arrow/writer.h> +#include "ArrowBufferedStreams.h" +#include "CHColumnToArrowColumn.h" + + +namespace NDB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_EXCEPTION; +} + +ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_) + : IOutputFormat(header_, out_), format_settings{format_settings_} +{ +} + +void ParquetBlockOutputFormat::consume(Chunk chunk) +{ + const size_t columns_num = chunk.getNumColumns(); + std::shared_ptr<arrow::Table> arrow_table; + + if (!ch_column_to_arrow_column) + { + const Block & header = getPort(PortKind::Main).getHeader(); + ch_column_to_arrow_column = std::make_unique<CHColumnToArrowColumn>(header, "Parquet", false); + } + + ch_column_to_arrow_column->chChunkToArrowTable(arrow_table, chunk, columns_num); + + if (!file_writer) + { + auto sink = std::make_shared<ArrowBufferedOutputStream>(out); + + parquet::WriterProperties::Builder builder; +#if USE_SNAPPY + builder.compression(parquet::Compression::SNAPPY); +#endif + auto props = builder.build(); + auto status = parquet::arrow::FileWriter::Open( + *arrow_table->schema(), + arrow::default_memory_pool(), + sink, + props, /*parquet::default_writer_properties(),*/ + &file_writer); + if (!status.ok()) + throw Exception{"Error while opening a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION}; + } + + // TODO: calculate row_group_size depending on a number of rows and table size + auto status = file_writer->WriteTable(*arrow_table, format_settings.parquet.row_group_size); + + if (!status.ok()) + throw Exception{"Error while writing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION}; +} + +void ParquetBlockOutputFormat::finalize() +{ + if (!file_writer) + { + const Block & header = getPort(PortKind::Main).getHeader(); + + consume(Chunk(header.getColumns(), 0)); + } + + auto status = file_writer->Close(); + if (!status.ok()) + throw Exception{"Error while closing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION}; +} + +void registerOutputFormatProcessorParquet(FormatFactory & factory) +{ + factory.registerOutputFormatProcessor( + "parquet", + [](WriteBuffer & buf, + const Block & sample, + const RowOutputFormatParams &, + const FormatSettings & format_settings) + { + return std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings); + }); +} + +} + +#else + +namespace DB +{ +class FormatFactory; +void registerOutputFormatProcessorParquet(FormatFactory &) +{ +} +} + +#endif diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h new file mode 100644 index 00000000000..6ae69edbd6d --- /dev/null +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h @@ -0,0 +1,49 @@ +#pragma once + +//#if !defined(ARCADIA_BUILD) +//# include "config_formats.h" +//#endif +#if USE_PARQUET +# include <Processors/Formats/IOutputFormat.h> +# include <Formats/FormatSettings.h> + +namespace arrow +{ +class Array; +class DataType; +} + +namespace parquet +{ +namespace arrow +{ + class FileWriter; +} +} + +namespace NDB +{ + +class CHColumnToArrowColumn; + +class ParquetBlockOutputFormat : public IOutputFormat +{ +public: + ParquetBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_); + + String getName() const override { return "ParquetBlockOutputFormat"; } + void consume(Chunk) override; + void finalize() override; + + String getContentType() const override { return "application/octet-stream"; } + +private: + const FormatSettings format_settings; + + std::unique_ptr<parquet::arrow::FileWriter> file_writer; + std::unique_ptr<CHColumnToArrowColumn> ch_column_to_arrow_column; +}; + +} + +#endif diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowOutputFormat.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowOutputFormat.cpp new file mode 100644 index 00000000000..5e3bdb9d54a --- /dev/null +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowOutputFormat.cpp @@ -0,0 +1,55 @@ +#include <IO/WriteHelpers.h> +#include <IO/WriteBufferFromString.h> +#include <Processors/Formats/Impl/TSKVRowOutputFormat.h> +#include <Formats/FormatFactory.h> + + +namespace NDB +{ + +TSKVRowOutputFormat::TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const RowOutputFormatParams & params_, const FormatSettings & format_settings_) + : TabSeparatedRowOutputFormat(out_, header, false, false, params_, format_settings_) +{ + const auto & sample = getPort(PortKind::Main).getHeader(); + NamesAndTypesList columns(sample.getNamesAndTypesList()); + fields.assign(columns.begin(), columns.end()); + + for (auto & field : fields) + { + WriteBufferFromOwnString wb; + writeAnyEscapedString<'='>(field.name.data(), field.name.data() + field.name.size(), wb); + writeCString("=", wb); + field.name = wb.str(); + } +} + + +void TSKVRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) +{ + writeString(fields[field_number].name, out); + serialization.serializeTextEscaped(column, row_num, out, format_settings); + ++field_number; +} + + +void TSKVRowOutputFormat::writeRowEndDelimiter() +{ + writeChar('\n', out); + field_number = 0; +} + + +void registerOutputFormatProcessorTSKV(FormatFactory & factory) +{ + factory.registerOutputFormatProcessor("TSKV", []( + WriteBuffer & buf, + const Block & sample, + const RowOutputFormatParams & params, + const FormatSettings & settings) + { + return std::make_shared<TSKVRowOutputFormat>(buf, sample, params, settings); + }); + factory.markOutputFormatSupportsParallelFormatting("TSKV"); +} + +} diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowOutputFormat.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowOutputFormat.h new file mode 100644 index 00000000000..b72934d9ccb --- /dev/null +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TSKVRowOutputFormat.h @@ -0,0 +1,29 @@ +#pragma once + +#include <Formats/FormatSettings.h> +#include <Processors/Formats/Impl/TabSeparatedRowOutputFormat.h> + + +namespace NDB +{ + +/** The stream for outputting data in the TSKV format. + * TSKV is similar to TabSeparated, but before every value, its name and equal sign are specified: name=value. + * This format is very inefficient. + */ +class TSKVRowOutputFormat: public TabSeparatedRowOutputFormat +{ +public: + TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const RowOutputFormatParams & params_, const FormatSettings & format_settings); + + String getName() const override { return "TSKVRowOutputFormat"; } + + void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override; + void writeRowEndDelimiter() override; + +protected: + NamesAndTypes fields; + size_t field_number = 0; +}; + +} diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRawRowOutputFormat.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRawRowOutputFormat.h new file mode 100644 index 00000000000..1a7caadf034 --- /dev/null +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRawRowOutputFormat.h @@ -0,0 +1,35 @@ +#pragma once + +#include <Formats/FormatSettings.h> +#include <Processors/Formats/Impl/TabSeparatedRowOutputFormat.h> + + +namespace NDB +{ + +/** A stream for outputting data in tsv format, but without escaping individual values. + * (That is, the output is irreversible.) + */ +class TabSeparatedRawRowOutputFormat : public TabSeparatedRowOutputFormat +{ +public: + TabSeparatedRawRowOutputFormat( + WriteBuffer & out_, + const Block & header_, + bool with_names_, + bool with_types_, + const RowOutputFormatParams & params_, + const FormatSettings & format_settings_) + : TabSeparatedRowOutputFormat(out_, header_, with_names_, with_types_, params_, format_settings_) + { + } + + String getName() const override { return "TabSeparatedRawRowOutputFormat"; } + + void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override + { + serialization.serializeText(column, row_num, out, format_settings); + } +}; + +} diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.cpp new file mode 100644 index 00000000000..11c93845f43 --- /dev/null +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.cpp @@ -0,0 +1,131 @@ +#include <Processors/Formats/Impl/TabSeparatedRowOutputFormat.h> +#include <Processors/Formats/Impl/TabSeparatedRawRowOutputFormat.h> +#include <Formats/FormatFactory.h> +#include <IO/WriteHelpers.h> + + +namespace NDB +{ +TabSeparatedRowOutputFormat::TabSeparatedRowOutputFormat( + WriteBuffer & out_, + const Block & header_, + bool with_names_, + bool with_types_, + const RowOutputFormatParams & params_, + const FormatSettings & format_settings_) + : IRowOutputFormat(header_, out_, params_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_) +{ +} + + +void TabSeparatedRowOutputFormat::doWritePrefix() +{ + const auto & header = getPort(PortKind::Main).getHeader(); + size_t columns = header.columns(); + + if (with_names) + { + for (size_t i = 0; i < columns; ++i) + { + writeEscapedString(header.safeGetByPosition(i).name, out); + writeChar(i == columns - 1 ? '\n' : '\t', out); + } + } + + if (with_types) + { + for (size_t i = 0; i < columns; ++i) + { + writeEscapedString(header.safeGetByPosition(i).type->getName(), out); + writeChar(i == columns - 1 ? '\n' : '\t', out); + } + } +} + + +void TabSeparatedRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) +{ + serialization.serializeTextEscaped(column, row_num, out, format_settings); +} + + +void TabSeparatedRowOutputFormat::writeFieldDelimiter() +{ + writeChar('\t', out); +} + + +void TabSeparatedRowOutputFormat::writeRowEndDelimiter() +{ + if (format_settings.tsv.crlf_end_of_line) + writeChar('\r', out); + writeChar('\n', out); +} + +void TabSeparatedRowOutputFormat::writeBeforeTotals() +{ + writeChar('\n', out); +} + +void TabSeparatedRowOutputFormat::writeBeforeExtremes() +{ + writeChar('\n', out); +} + + +void registerOutputFormatProcessorTabSeparated(FormatFactory & factory) +{ + for (const auto * name : {"TabSeparated", "TSV"}) + { + factory.registerOutputFormatProcessor(name, []( + WriteBuffer & buf, + const Block & sample, + const RowOutputFormatParams & params, + const FormatSettings & settings) + { + return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, false, false, params, settings); + }); + factory.markOutputFormatSupportsParallelFormatting(name); + } + + for (const auto * name : {"TabSeparatedRaw", "TSVRaw"}) + { + factory.registerOutputFormatProcessor(name, []( + WriteBuffer & buf, + const Block & sample, + const RowOutputFormatParams & params, + const FormatSettings & settings) + { + return std::make_shared<TabSeparatedRawRowOutputFormat>(buf, sample, false, false, params, settings); + }); + factory.markOutputFormatSupportsParallelFormatting(name); + } + + for (const auto * name : {"TabSeparatedWithNames", "tsv_with_names"}) + { + factory.registerOutputFormatProcessor(name, []( + WriteBuffer & buf, + const Block & sample, + const RowOutputFormatParams & params, + const FormatSettings & settings) + { + return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, false, params, settings); + }); + factory.markOutputFormatSupportsParallelFormatting(name); + } + + for (const auto * name : {"TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes"}) + { + factory.registerOutputFormatProcessor(name, []( + WriteBuffer & buf, + const Block & sample, + const RowOutputFormatParams & params, + const FormatSettings & settings) + { + return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, true, params, settings); + }); + factory.markOutputFormatSupportsParallelFormatting(name); + } +} + +} diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.h new file mode 100644 index 00000000000..05eb0a36262 --- /dev/null +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.h @@ -0,0 +1,49 @@ +#pragma once + +#include <Core/Block.h> +#include <Formats/FormatSettings.h> +#include <Processors/Formats/IRowOutputFormat.h> + + +namespace NDB +{ + +class WriteBuffer; + +/** A stream for outputting data in tsv format. + */ +class TabSeparatedRowOutputFormat : public IRowOutputFormat +{ +public: + /** with_names - output in the first line a header with column names + * with_types - output the next line header with the names of the types + */ + TabSeparatedRowOutputFormat( + WriteBuffer & out_, + const Block & header_, + bool with_names_, + bool with_types_, + const RowOutputFormatParams & params_, + const FormatSettings & format_settings_); + + String getName() const override { return "TabSeparatedRowOutputFormat"; } + + void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override; + void writeFieldDelimiter() override; + void writeRowEndDelimiter() override; + void writeBeforeTotals() override; + void writeBeforeExtremes() override; + + void doWritePrefix() override; + + /// https://www.iana.org/assignments/media-types/text/tab-separated-values + String getContentType() const override { return "text/tab-separated-values; charset=UTF-8"; } + +protected: + + bool with_names; + bool with_types; + const FormatSettings format_settings; +}; + +} |