summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <[email protected]>2022-07-11 19:08:43 +0300
committera-romanov <[email protected]>2022-07-11 19:08:43 +0300
commit5a88a82f28dde161dd304636e9a6ed585b2022eb (patch)
tree9a97c1d4f7c8b3e94a5ab4b9a9bfb939669da59a
parentc68a62be7a6ee64daea4e67cefee29c01b941c0e (diff)
SerializeFormat first draft.
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp2
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt2
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp252
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp2
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.cpp142
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.h41
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp157
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h47
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/InputStreamFromInputFormat.h (renamed from ydb/library/yql/udfs/common/clickhouse/client/src/Formats/InputStreamFromInputFormat.h)0
9 files changed, 610 insertions, 35 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index dabdb9eaeb3..503947c64f5 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -15,7 +15,7 @@
#include <ydb/library/yql/udfs/common/clickhouse/client/src/Core/ColumnsWithTypeAndName.h>
#include <ydb/library/yql/udfs/common/clickhouse/client/src/Formats/FormatFactory.h>
-#include <ydb/library/yql/udfs/common/clickhouse/client/src/Formats/InputStreamFromInputFormat.h>
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/InputStreamFromInputFormat.h>
#endif
#include "yql_s3_read_actor.h"
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt b/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt
index 48ca0a7ec00..489a1921907 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt
+++ b/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt
@@ -221,6 +221,7 @@ target_sources(clickhouse_client_udf.global PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferFromFileDescriptor.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferFromFileDescriptorDiscardOnFailure.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferFromPocoSocket.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Compression/CompressionCodecLZ4.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Compression/CompressionCodecMultiple.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Compression/CompressionCodecNone.cpp
@@ -436,6 +437,7 @@ target_sources(clickhouse_client_udf.global PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowInputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/RawBLOBRowInputFormat.cpp
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
index cab257b952a..d10c8148261 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
@@ -32,8 +32,9 @@
#include "src/Core/ColumnsWithTypeAndName.h"
#include "src/Formats/FormatFactory.h"
-#include "src/Formats/InputStreamFromInputFormat.h"
#include "src/Formats/registerFormats.h"
+#include "src/Processors/Formats/InputStreamFromInputFormat.h"
+#include "src/Processors/Formats/OutputStreamToOutputFormat.h"
#include <util/generic/yexception.h>
#include <util/string/split.h>
@@ -215,11 +216,12 @@ NDB::DataTypePtr MetaToClickHouse(const TColumnMeta& meta) {
return ret;
}
-void PermuteUuid(const char* src, char* dst, bool forward) {
- static ui32 Pairs[16] = { 4, 5, 6, 7, 2, 3, 0, 1, 15, 14, 13, 12, 11, 10, 9, 8 };
- static ui32 InvPairs[16] = { 6, 7, 4, 5, 0, 1, 2, 3, 15, 14, 13, 12, 11, 10, 9, 8 };
+template<bool Forward>
+void PermuteUuid(const char* src, char* dst) {
+ static constexpr ui32 Pairs[16] = { 4, 5, 6, 7, 2, 3, 0, 1, 15, 14, 13, 12, 11, 10, 9, 8 };
+ static constexpr ui32 InvPairs[16] = { 6, 7, 4, 5, 0, 1, 2, 3, 15, 14, 13, 12, 11, 10, 9, 8 };
for (ui32 i = 0; i < 16; ++i) {
- dst[forward ? Pairs[i] : InvPairs[i]] = src[i];
+ dst[Forward ? Pairs[i] : InvPairs[i]] = src[i];
}
}
@@ -320,7 +322,7 @@ TUnboxedValuePod ConvertOutputValue(const NDB::IColumn* col, const TColumnMeta&
}
else if (slot == EDataSlot::Uuid) {
char uuid[16];
- PermuteUuid(ref.data, uuid, false);
+ PermuteUuid<false>(ref.data, uuid);
return valueBuilder->NewString({ uuid, sizeof(uuid) }).Release();
}
else if (slot == EDataSlot::Decimal) {
@@ -360,6 +362,49 @@ TUnboxedValuePod ConvertOutputValue(const NDB::IColumn* col, const TColumnMeta&
}
}
+void ConvertInputValue(const TUnboxedValuePod& value, const NDB::IColumn::MutablePtr& col, const TColumnMeta& meta) {
+ if (meta.IsOptional && !value)
+ return col->insert(NDB::Field());
+
+ switch (*meta.Slot) {
+ case EDataSlot::Utf8:
+ case EDataSlot::Json:
+ case EDataSlot::String:
+ return col->insert(NDB::Field(value.AsStringRef()));
+ case EDataSlot::Bool:
+ return col->insert(NDB::Field(value.Get<bool>()));
+ case EDataSlot::Uint8:
+ return col->insert(NDB::Field(value.Get<ui8>()));
+ case EDataSlot::Uint16:
+ case EDataSlot::Date:
+ case EDataSlot::TzDate:
+ return col->insert(NDB::Field(value.Get<ui16>()));
+ case EDataSlot::Uint32:
+ case EDataSlot::Datetime:
+ case EDataSlot::TzDatetime:
+ return col->insert(NDB::Field(value.Get<ui32>()));
+ case EDataSlot::Uint64:
+ case EDataSlot::Timestamp:
+ case EDataSlot::TzTimestamp:
+ return col->insert(NDB::Field(value.Get<ui64>()));
+ case EDataSlot::Int8:
+ return col->insert(NDB::Field(value.Get<i8>()));
+ case EDataSlot::Int16:
+ return col->insert(NDB::Field(value.Get<i16>()));
+ case EDataSlot::Int32:
+ return col->insert(NDB::Field(value.Get<i32>()));
+ case EDataSlot::Int64:
+ return col->insert(NDB::Field(value.Get<i64>()));
+ case EDataSlot::Uuid: {
+ NDB::UUID uuid;
+ PermuteUuid<true>(value.AsStringRef().Data(), reinterpret_cast<char*>(&uuid));
+ return col->insert(NDB::Field(uuid));
+ }
+ default:
+ UdfTerminate("TODO: Unsupported field type.");
+ }
+}
+
class TParseFromYdb : public TBoxedValue {
public:
class TStreamValue : public TBoxedValue {
@@ -539,6 +584,31 @@ private:
size_t CurrentRow = 0U;
};
+NDB::FormatSettings GetFormatSettings(const std::string_view& view) {
+ NDB::FormatSettings settings;
+ settings.skip_unknown_fields = true;
+ settings.with_names_use_header = true;
+ if (!view.empty()) {
+ const std::string str(view);
+ const JSON json(str);
+#define SUPPORTED_FLAGS(xx) \
+ xx(skip_unknown_fields) \
+ xx(import_nested_json) \
+ xx(with_names_use_header) \
+ xx(null_as_default) \
+
+#define SET_FLAG(flag) \
+ if (json.has(#flag)) \
+ settings.flag = json[#flag].get<bool>();
+
+ SUPPORTED_FLAGS(SET_FLAG)
+
+#undef SET_FLAG
+#undef SUPPORTED_FLAGS
+ }
+ return settings;
+}
+
class TParseFormat : public TBoxedValue {
public:
TParseFormat(const std::string_view& type, const std::string_view& settings,
@@ -569,31 +639,6 @@ public:
UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.what()).data());
}
private:
- static NDB::FormatSettings GetFormatSettings(const std::string_view& view) {
- NDB::FormatSettings settings;
- settings.skip_unknown_fields = true;
- settings.with_names_use_header = true;
- if (!view.empty()) {
- const std::string str(view);
- const JSON json(str);
- #define SUPPORTED_FLAGS(xx) \
- xx(skip_unknown_fields) \
- xx(import_nested_json) \
- xx(with_names_use_header) \
- xx(null_as_default) \
-
- #define SET_FLAG(flag) \
- if (json.has(#flag)) \
- settings.flag = json[#flag].get<bool>();
-
- SUPPORTED_FLAGS(SET_FLAG)
-
- #undef SET_FLAG
- #undef SUPPORTED_FLAGS
- }
- return settings;
- }
-
const std::string Type;
const NDB::FormatSettings Settings;
const TSourcePosition Pos;
@@ -700,6 +745,93 @@ private:
const ui32 TupleSize;
};
+class TSerializeFormat : public TBoxedValue {
+ class TStreamValue : public TBoxedValue {
+ public:
+ TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::string& type, const NDB::FormatSettings& settings, const std::vector<TColumnMeta>& inMeta, const NDB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos)
+ : ValueBuilder(valueBuilder)
+ , Stream(stream)
+ , InMeta(inMeta)
+ , Pos(pos)
+ , Buffer(std::make_unique<NDB::WriteBufferFromOwnString>())
+ , BlockStream(std::make_unique<NDB::OutputStreamToOutputFormat>(NDB::FormatFactory::instance().getOutputFormat(type, *Buffer, NDB::Block(columns), nullptr, {}, settings)))
+ , CurrentBlock(columns)
+ {
+ BlockStream->writePrefix();
+ }
+ private:
+ EFetchStatus Fetch(TUnboxedValue& result) final try {
+ if (IsFinished)
+ return EFetchStatus::Finish;
+
+ auto columns = CurrentBlock.mutateColumns();
+ for (TUnboxedValue row; !IsFinished;) {
+ switch (const auto status = Stream.Fetch(row)) {
+ case EFetchStatus::Yield:
+ return EFetchStatus::Yield;
+ case EFetchStatus::Ok:
+ for (auto i = 0U; i < columns.size(); ++i)
+ ConvertInputValue(row.GetElement(i), columns[i], InMeta[i]);
+ continue;
+ case EFetchStatus::Finish:
+ IsFinished = true;
+ BlockStream->writeSuffix();
+ BlockStream->flush();
+ break;
+ }
+ }
+
+ CurrentBlock.setColumns(std::move(columns));
+ BlockStream->write(CurrentBlock);
+ BlockStream->flush();
+ CurrentBlock = CurrentBlock.cloneEmpty();
+
+ result = ValueBuilder->NewString(Buffer->str());
+ Buffer->restart();
+ return EFetchStatus::Ok;
+ }
+ catch (const Poco::Exception& e) {
+ UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
+ }
+ catch (const std::exception& e) {
+ UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.what()).data());
+ }
+
+ const IValueBuilder* ValueBuilder;
+ const TUnboxedValue Stream;
+ const std::vector<TColumnMeta> InMeta;
+ const TSourcePosition Pos;
+
+ const std::unique_ptr<NDB::WriteBufferFromOwnString> Buffer;
+ const std::unique_ptr<NDB::IBlockOutputStream> BlockStream;
+
+ NDB::Block CurrentBlock;
+ bool IsFinished = false;
+ };
+
+public:
+ TSerializeFormat(const std::string_view& type, const std::string_view& settings, const TSourcePosition& pos, std::vector<TColumnMeta>&& inMeta, NDB::ColumnsWithTypeAndName&& columns)
+ : Type(type), Settings(GetFormatSettings(settings)), Pos(pos), InMeta(std::move(inMeta)), Columns(std::move(columns))
+ {}
+
+ TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final try {
+ return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, Type, Settings, InMeta, Columns, Pos));
+ }
+ catch (const Poco::Exception& e) {
+ UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
+ }
+ catch (const std::exception& e) {
+ UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.what()).data());
+ }
+private:
+ const std::string Type;
+ const NDB::FormatSettings Settings;
+
+ const TSourcePosition Pos;
+ const std::vector<TColumnMeta> InMeta;
+ const NDB::ColumnsWithTypeAndName Columns;
+};
+
struct TCHInitializer {
using TWeakPtr = std::weak_ptr<TCHInitializer>;
using TPtr = std::shared_ptr<TCHInitializer>;
@@ -838,7 +970,7 @@ public:
TClickHouseClientModule() = default;
static const TStringRef& Name() {
- static const auto name = TStringRef::Of("ClickHouseClient");
+ static constexpr auto name = TStringRef::Of("ClickHouseClient");
return name;
}
@@ -849,6 +981,7 @@ public:
sink.Add(TStringRef::Of("ParseFormat"))->SetTypeAwareness();
sink.Add(TStringRef::Of("ParseBlocks"))->SetTypeAwareness();
sink.Add(TStringRef::Of("ParseFromYdb"))->SetTypeAwareness();
+ sink.Add(TStringRef::Of("SerializeFormat"))->SetTypeAwareness();
}
void BuildFunctionTypeInfo(
@@ -859,7 +992,6 @@ public:
IFunctionTypeInfoBuilder& builder) const final try {
LazyInitContext();
auto argBuilder = builder.Args();
-
if (name == "ToYqlType") {
argBuilder->Add<TUtf8>();
argBuilder->Add<TUtf8>();
@@ -1013,6 +1145,58 @@ public:
TTypePrinter(*typeHelper, resultType).Out(sb.Out);
return builder.SetError(sb);
}
+ } else if (name == "SerializeFormat") {
+ const auto userTypeInspector = TTupleTypeInspector(*typeHelper, userType);
+ if (!userTypeInspector || userTypeInspector.GetElementsCount() < 1) {
+ return builder.SetError("Invalid user type.");
+ }
+
+ const auto argsTypeTuple = userTypeInspector.GetElementType(0);
+ const auto argsTypeInspector = TTupleTypeInspector(*typeHelper, argsTypeTuple);
+ if (!argsTypeInspector) {
+ return builder.SetError("Invalid user type - expected tuple.");
+ }
+
+ if (const auto argsCount = argsTypeInspector.GetElementsCount(); argsCount != 1) {
+ ::TStringBuilder sb;
+ sb << "Invalid user type - expected one or two arguments, got: " << argsCount;
+ return builder.SetError(sb);
+ }
+
+ const auto inputType = argsTypeInspector.GetElementType(0);
+
+ builder.UserType(userType);
+ builder.Args()->Add(inputType).Done();
+ builder.Returns(builder.Stream()->Item<char*>());
+
+ if (const auto structType = TStructTypeInspector(*typeHelper, TStreamTypeInspector(*typeHelper, inputType).GetItemType())) {
+ std::vector<TColumnMeta> outMeta(structType.GetMembersCount());
+ NDB::ColumnsWithTypeAndName columns(structType.GetMembersCount());
+ for (ui32 i = 0U; i < structType.GetMembersCount(); ++i) {
+ if (auto& meta = outMeta[i]; GetDataType(*typeHelper, structType.GetMemberType(i), meta)) {
+ auto& colsumn = columns[i];
+ colsumn.type = MetaToClickHouse(meta);
+ colsumn.name = structType.GetMemberName(i);
+ } else {
+ ::TStringBuilder sb;
+ sb << "Incompatible column '" << structType.GetMemberName(i) << "' type: ";
+ TTypePrinter(*typeHelper, structType.GetMemberType(i)).Out(sb.Out);
+ return builder.SetError(sb);
+ }
+ }
+
+ if (!(flags & TFlags::TypesOnly)) {
+ const std::string_view& typeCfg = typeConfig;
+ const auto jsonFrom = typeCfg.find('{');
+ builder.Implementation(new TSerializeFormat(typeCfg.substr(0U, jsonFrom), std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom), builder.GetSourcePosition(), std::move(outMeta), std::move(columns)));
+ }
+ return;
+ } else {
+ ::TStringBuilder sb;
+ sb << "Incompatible row type: ";
+ TTypePrinter(*typeHelper, inputType).Out(sb.Out);
+ return builder.SetError(sb);
+ }
} else if (name == "ParseFromYdb") {
builder.UserType(userType);
builder.Args()->Add(builder.Stream()->Item<char*>()).Add<TOptional<TUtf8>>().Done();
@@ -1040,7 +1224,7 @@ public:
builder.Implementation(new TParseFromYdb(builder.GetSourcePosition(), std::move(columns)));
}
return;
- } else {
+ } else {
::TStringBuilder sb;
sb << "Incompatible row type: ";
TTypePrinter(*typeHelper, userType).Out(sb.Out);
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp
index 70373cc27b6..9dc0db62432 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp
@@ -11,6 +11,7 @@ namespace NDB
void registerInputFormatProcessorNative(FormatFactory & factory);
void registerInputFormatProcessorJSONAsString(FormatFactory & factory);
void registerInputFormatProcessorJSONEachRow(FormatFactory & factory);
+void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerInputFormatProcessorRawBLOB(FormatFactory & factory);
void registerInputFormatProcessorORC(FormatFactory & factory);
void registerInputFormatProcessorArrow(FormatFactory & factory);
@@ -29,6 +30,7 @@ void registerFormats()
registerInputFormatProcessorNative(factory);
registerInputFormatProcessorJSONAsString(factory);
registerInputFormatProcessorJSONEachRow(factory);
+ registerOutputFormatProcessorJSONEachRow(factory);
registerInputFormatProcessorRawBLOB(factory);
registerInputFormatProcessorORC(factory);
registerInputFormatProcessorArrow(factory);
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.cpp
new file mode 100644
index 00000000000..89479042f4a
--- /dev/null
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.cpp
@@ -0,0 +1,142 @@
+#include <Poco/UTF8Encoding.h>
+#include <IO/WriteBufferValidUTF8.h>
+#include <Common/MemoryTracker.h>
+#include <common/types.h>
+
+#ifdef __SSE2__
+ #include <emmintrin.h>
+#endif
+
+
+namespace NDB
+{
+
+const size_t WriteBufferValidUTF8::DEFAULT_SIZE = 4096;
+
+/** Index into the table below with the first byte of a UTF-8 sequence to
+ * get the number of trailing bytes that are supposed to follow it.
+ * Note that *legal* UTF-8 values can't have 4 or 5-bytes. The table is
+ * left as-is for anyone who may want to do such conversion, which was
+ * allowed in earlier algorithms.
+ */
+extern const UInt8 length_of_utf8_sequence[256] =
+{
+ 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
+ 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
+ 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
+ 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
+ 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
+ 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
+ 2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2, 2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,
+ 3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3, 4,4,4,4,4,4,4,4,5,5,5,5,6,6,6,6
+};
+
+
+WriteBufferValidUTF8::WriteBufferValidUTF8(
+ WriteBuffer & output_buffer_, bool group_replacements_, const char * replacement_, size_t size)
+ : BufferWithOwnMemory<WriteBuffer>(std::max(static_cast<size_t>(32), size)), output_buffer(output_buffer_),
+ group_replacements(group_replacements_), replacement(replacement_)
+{
+}
+
+
+inline void WriteBufferValidUTF8::putReplacement()
+{
+ if (replacement.empty() || (group_replacements && just_put_replacement))
+ return;
+
+ just_put_replacement = true;
+ output_buffer.write(replacement.data(), replacement.size());
+}
+
+
+inline void WriteBufferValidUTF8::putValid(char *data, size_t len)
+{
+ if (len == 0)
+ return;
+
+ just_put_replacement = false;
+ output_buffer.write(data, len);
+}
+
+
+void WriteBufferValidUTF8::nextImpl()
+{
+ char * p = memory.data();
+ char * valid_start = p;
+
+ while (p < pos)
+ {
+#ifdef __SSE2__
+ /// Fast skip of ASCII
+ static constexpr size_t SIMD_BYTES = 16;
+ const char * simd_end = p + (pos - p) / SIMD_BYTES * SIMD_BYTES;
+
+ while (p < simd_end && !_mm_movemask_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i*>(p))))
+ p += SIMD_BYTES;
+
+ if (!(p < pos))
+ break;
+#endif
+
+ size_t len = length_of_utf8_sequence[static_cast<unsigned char>(*p)];
+
+ if (len > 4)
+ { // NOLINT
+ /// Invalid start of sequence. Skip one byte.
+ putValid(valid_start, p - valid_start);
+ putReplacement();
+ ++p;
+ valid_start = p;
+ }
+ else if (p + len > pos)
+ {
+ /// Sequence was not fully written to this buffer.
+ break;
+ }
+ else if (Poco::UTF8Encoding::isLegal(reinterpret_cast<unsigned char *>(p), len))
+ {
+ /// Valid sequence.
+ p += len;
+ }
+ else
+ {
+ /// Invalid sequence. Skip just first byte.
+ putValid(valid_start, p - valid_start);
+ putReplacement();
+ ++p;
+ valid_start = p;
+ }
+ }
+
+ putValid(valid_start, p - valid_start);
+
+ size_t cnt = pos - p;
+
+ /// Shift unfinished sequence to start of buffer.
+ for (size_t i = 0; i < cnt; ++i)
+ memory[i] = p[i];
+
+ working_buffer = Buffer(&memory[cnt], memory.data() + memory.size());
+}
+
+
+void WriteBufferValidUTF8::finish()
+{
+ /// Write all complete sequences from buffer.
+ nextImpl();
+
+ /// If unfinished sequence at end, then write replacement.
+ if (working_buffer.begin() != memory.data())
+ putReplacement();
+}
+
+
+WriteBufferValidUTF8::~WriteBufferValidUTF8()
+{
+ /// FIXME move final flush into the caller
+ MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
+ finish();
+}
+
+}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.h b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.h
new file mode 100644
index 00000000000..0e13bb5de0b
--- /dev/null
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.h
@@ -0,0 +1,41 @@
+#pragma once
+
+#include <IO/WriteBuffer.h>
+#include <IO/BufferWithOwnMemory.h>
+
+
+namespace NDB
+{
+
+/** Writes the data to another buffer, replacing the invalid UTF-8 sequences with the specified sequence.
+ * If the valid UTF-8 is already written, it works faster.
+ * Note: before using the resulting string, destroy this object.
+ */
+class WriteBufferValidUTF8 final : public BufferWithOwnMemory<WriteBuffer>
+{
+private:
+ WriteBuffer & output_buffer;
+ bool group_replacements;
+ /// The last recorded character was `replacement`.
+ bool just_put_replacement = false;
+ std::string replacement;
+
+ void putReplacement();
+ void putValid(char * data, size_t len);
+
+ void nextImpl() override;
+ void finish();
+
+public:
+ static const size_t DEFAULT_SIZE;
+
+ WriteBufferValidUTF8(
+ WriteBuffer & output_buffer_,
+ bool group_replacements_ = true,
+ const char * replacement_ = "\xEF\xBF\xBD",
+ size_t size = DEFAULT_SIZE);
+
+ ~WriteBufferValidUTF8() override;
+};
+
+}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp
new file mode 100644
index 00000000000..6bb86ee9343
--- /dev/null
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp
@@ -0,0 +1,157 @@
+#include <IO/WriteHelpers.h>
+#include <IO/WriteBufferValidUTF8.h>
+#include <Processors/Formats/Impl/JSONEachRowRowOutputFormat.h>
+#include <Formats/FormatFactory.h>
+
+
+namespace NDB
+{
+
+
+JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(
+ WriteBuffer & out_,
+ const Block & header_,
+ const RowOutputFormatParams & params_,
+ const FormatSettings & settings_)
+ : IRowOutputFormat(header_, out_, params_),
+ settings(settings_)
+{
+ const auto & sample = getPort(PortKind::Main).getHeader();
+ size_t columns = sample.columns();
+ fields.resize(columns);
+
+ for (size_t i = 0; i < columns; ++i)
+ {
+ WriteBufferFromString buf(fields[i]);
+ writeJSONString(sample.getByPosition(i).name, buf, settings);
+ }
+}
+
+
+void JSONEachRowRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num)
+{
+ writeString(fields[field_number], out);
+ writeChar(':', out);
+
+ if (settings.json.serialize_as_strings)
+ {
+ WriteBufferFromOwnString buf;
+
+ serialization.serializeText(column, row_num, buf, settings);
+ writeJSONString(buf.str(), out, settings);
+ }
+ else
+ serialization.serializeTextJSON(column, row_num, out, settings);
+
+ ++field_number;
+}
+
+
+void JSONEachRowRowOutputFormat::writeFieldDelimiter()
+{
+ writeChar(',', out);
+}
+
+
+void JSONEachRowRowOutputFormat::writeRowStartDelimiter()
+{
+ writeChar('{', out);
+}
+
+
+void JSONEachRowRowOutputFormat::writeRowEndDelimiter()
+{
+ // Why do we need this weird `if`?
+ //
+ // The reason is the formatRow function that is broken with respect to
+ // row-between delimiters. It should not write them, but it does, and then
+ // hacks around it by having a special formatRowNoNewline version, which, as
+ // you guessed, removes the newline from the end of row. But the row-between
+ // delimiter goes into a second row, so it turns out to be in the beginning
+ // of the line, and the removal doesn't work. There is also a second bug --
+ // the row-between delimiter in this format is written incorrectly. In fact,
+ // it is not written at all, and the newline is written in a row-end
+ // delimiter ("}\n" instead of the correct "}"). With these two bugs
+ // combined, the test 01420_format_row works perfectly.
+ //
+ // A proper implementation of formatRow would use IRowOutputFormat directly,
+ // and not write row-between delimiters, instead of using IOutputFormat
+ // processor and its crutch row callback. This would require exposing
+ // IRowOutputFormat, which we don't do now, but which can be generally useful
+ // for other cases such as parallel formatting, that also require a control
+ // flow different from the usual IOutputFormat.
+ //
+ // I just don't have time or energy to redo all of this, but I need to
+ // support JSON array output here, which requires proper ",\n" row-between
+ // delimiters. For compatibility, I preserve the bug in case of non-array
+ // output.
+ if (settings.json.array_of_rows)
+ {
+ writeCString("}", out);
+ }
+ else
+ {
+ writeCString("}\n", out);
+ }
+ field_number = 0;
+}
+
+
+void JSONEachRowRowOutputFormat::writeRowBetweenDelimiter()
+{
+ // We preserve an existing bug here for compatibility. See the comment above.
+ if (settings.json.array_of_rows)
+ {
+ writeCString(",\n", out);
+ }
+}
+
+
+void JSONEachRowRowOutputFormat::writePrefix()
+{
+ if (settings.json.array_of_rows)
+ {
+ writeCString("[\n", out);
+ }
+}
+
+
+void JSONEachRowRowOutputFormat::writeSuffix()
+{
+ if (settings.json.array_of_rows)
+ {
+ writeCString("\n]\n", out);
+ }
+}
+
+
+void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory)
+{
+ factory.registerOutputFormatProcessor("json_each_row", [](
+ WriteBuffer & buf,
+ const Block & sample,
+ const RowOutputFormatParams & params,
+ const FormatSettings & _format_settings)
+ {
+ FormatSettings settings = _format_settings;
+ settings.json.serialize_as_strings = false;
+ return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params,
+ settings);
+ });
+ factory.markOutputFormatSupportsParallelFormatting("json_each_row");
+
+ factory.registerOutputFormatProcessor("JSONStringsEachRow", [](
+ WriteBuffer & buf,
+ const Block & sample,
+ const RowOutputFormatParams & params,
+ const FormatSettings & _format_settings)
+ {
+ FormatSettings settings = _format_settings;
+ settings.json.serialize_as_strings = true;
+ return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params,
+ settings);
+ });
+ factory.markOutputFormatSupportsParallelFormatting("JSONStringEachRow");
+}
+
+}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h
new file mode 100644
index 00000000000..133b7b8323b
--- /dev/null
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h
@@ -0,0 +1,47 @@
+#pragma once
+
+#include <Core/Block.h>
+#include <IO/WriteBuffer.h>
+#include <Processors/Formats/IRowOutputFormat.h>
+#include <Formats/FormatSettings.h>
+
+
+namespace NDB
+{
+
+/** The stream for outputting data in JSON format, by object per line.
+ * Does not validate UTF-8.
+ */
+class JSONEachRowRowOutputFormat : public IRowOutputFormat
+{
+public:
+ JSONEachRowRowOutputFormat(
+ WriteBuffer & out_,
+ const Block & header_,
+ const RowOutputFormatParams & params_,
+ const FormatSettings & settings_);
+
+ String getName() const override { return "JSONEachRowRowOutputFormat"; }
+
+ void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
+ void writeFieldDelimiter() override;
+ void writeRowStartDelimiter() override;
+ void writeRowEndDelimiter() override;
+ void writeRowBetweenDelimiter() override;
+ void writePrefix() override;
+ void writeSuffix() override;
+
+protected:
+ /// No totals and extremes.
+ void consumeTotals(Chunk) override {}
+ void consumeExtremes(Chunk) override {}
+
+ size_t field_number = 0;
+
+private:
+ Names fields;
+
+ FormatSettings settings;
+};
+
+}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/InputStreamFromInputFormat.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/InputStreamFromInputFormat.h
index de41f52cd72..de41f52cd72 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/InputStreamFromInputFormat.h
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/InputStreamFromInputFormat.h