diff options
author | a-romanov <[email protected]> | 2022-07-11 19:08:43 +0300 |
---|---|---|
committer | a-romanov <[email protected]> | 2022-07-11 19:08:43 +0300 |
commit | 5a88a82f28dde161dd304636e9a6ed585b2022eb (patch) | |
tree | 9a97c1d4f7c8b3e94a5ab4b9a9bfb939669da59a | |
parent | c68a62be7a6ee64daea4e67cefee29c01b941c0e (diff) |
SerializeFormat first draft.
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt | 2 | ||||
-rw-r--r-- | ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp | 252 | ||||
-rw-r--r-- | ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.cpp | 142 | ||||
-rw-r--r-- | ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.h | 41 | ||||
-rw-r--r-- | ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp | 157 | ||||
-rw-r--r-- | ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h | 47 | ||||
-rw-r--r-- | ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/InputStreamFromInputFormat.h (renamed from ydb/library/yql/udfs/common/clickhouse/client/src/Formats/InputStreamFromInputFormat.h) | 0 |
9 files changed, 610 insertions, 35 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index dabdb9eaeb3..503947c64f5 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -15,7 +15,7 @@ #include <ydb/library/yql/udfs/common/clickhouse/client/src/Core/ColumnsWithTypeAndName.h> #include <ydb/library/yql/udfs/common/clickhouse/client/src/Formats/FormatFactory.h> -#include <ydb/library/yql/udfs/common/clickhouse/client/src/Formats/InputStreamFromInputFormat.h> +#include <ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/InputStreamFromInputFormat.h> #endif #include "yql_s3_read_actor.h" diff --git a/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt b/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt index 48ca0a7ec00..489a1921907 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt +++ b/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt @@ -221,6 +221,7 @@ target_sources(clickhouse_client_udf.global PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferFromFileDescriptor.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferFromFileDescriptorDiscardOnFailure.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferFromPocoSocket.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Compression/CompressionCodecLZ4.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Compression/CompressionCodecMultiple.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Compression/CompressionCodecNone.cpp @@ -436,6 +437,7 @@ target_sources(clickhouse_client_udf.global PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowInputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/RawBLOBRowInputFormat.cpp diff --git a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp index cab257b952a..d10c8148261 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp @@ -32,8 +32,9 @@ #include "src/Core/ColumnsWithTypeAndName.h" #include "src/Formats/FormatFactory.h" -#include "src/Formats/InputStreamFromInputFormat.h" #include "src/Formats/registerFormats.h" +#include "src/Processors/Formats/InputStreamFromInputFormat.h" +#include "src/Processors/Formats/OutputStreamToOutputFormat.h" #include <util/generic/yexception.h> #include <util/string/split.h> @@ -215,11 +216,12 @@ NDB::DataTypePtr MetaToClickHouse(const TColumnMeta& meta) { return ret; } -void PermuteUuid(const char* src, char* dst, bool forward) { - static ui32 Pairs[16] = { 4, 5, 6, 7, 2, 3, 0, 1, 15, 14, 13, 12, 11, 10, 9, 8 }; - static ui32 InvPairs[16] = { 6, 7, 4, 5, 0, 1, 2, 3, 15, 14, 13, 12, 11, 10, 9, 8 }; +template<bool Forward> +void PermuteUuid(const char* src, char* dst) { + static constexpr ui32 Pairs[16] = { 4, 5, 6, 7, 2, 3, 0, 1, 15, 14, 13, 12, 11, 10, 9, 8 }; + static constexpr ui32 InvPairs[16] = { 6, 7, 4, 5, 0, 1, 2, 3, 15, 14, 13, 12, 11, 10, 9, 8 }; for (ui32 i = 0; i < 16; ++i) { - dst[forward ? Pairs[i] : InvPairs[i]] = src[i]; + dst[Forward ? Pairs[i] : InvPairs[i]] = src[i]; } } @@ -320,7 +322,7 @@ TUnboxedValuePod ConvertOutputValue(const NDB::IColumn* col, const TColumnMeta& } else if (slot == EDataSlot::Uuid) { char uuid[16]; - PermuteUuid(ref.data, uuid, false); + PermuteUuid<false>(ref.data, uuid); return valueBuilder->NewString({ uuid, sizeof(uuid) }).Release(); } else if (slot == EDataSlot::Decimal) { @@ -360,6 +362,49 @@ TUnboxedValuePod ConvertOutputValue(const NDB::IColumn* col, const TColumnMeta& } } +void ConvertInputValue(const TUnboxedValuePod& value, const NDB::IColumn::MutablePtr& col, const TColumnMeta& meta) { + if (meta.IsOptional && !value) + return col->insert(NDB::Field()); + + switch (*meta.Slot) { + case EDataSlot::Utf8: + case EDataSlot::Json: + case EDataSlot::String: + return col->insert(NDB::Field(value.AsStringRef())); + case EDataSlot::Bool: + return col->insert(NDB::Field(value.Get<bool>())); + case EDataSlot::Uint8: + return col->insert(NDB::Field(value.Get<ui8>())); + case EDataSlot::Uint16: + case EDataSlot::Date: + case EDataSlot::TzDate: + return col->insert(NDB::Field(value.Get<ui16>())); + case EDataSlot::Uint32: + case EDataSlot::Datetime: + case EDataSlot::TzDatetime: + return col->insert(NDB::Field(value.Get<ui32>())); + case EDataSlot::Uint64: + case EDataSlot::Timestamp: + case EDataSlot::TzTimestamp: + return col->insert(NDB::Field(value.Get<ui64>())); + case EDataSlot::Int8: + return col->insert(NDB::Field(value.Get<i8>())); + case EDataSlot::Int16: + return col->insert(NDB::Field(value.Get<i16>())); + case EDataSlot::Int32: + return col->insert(NDB::Field(value.Get<i32>())); + case EDataSlot::Int64: + return col->insert(NDB::Field(value.Get<i64>())); + case EDataSlot::Uuid: { + NDB::UUID uuid; + PermuteUuid<true>(value.AsStringRef().Data(), reinterpret_cast<char*>(&uuid)); + return col->insert(NDB::Field(uuid)); + } + default: + UdfTerminate("TODO: Unsupported field type."); + } +} + class TParseFromYdb : public TBoxedValue { public: class TStreamValue : public TBoxedValue { @@ -539,6 +584,31 @@ private: size_t CurrentRow = 0U; }; +NDB::FormatSettings GetFormatSettings(const std::string_view& view) { + NDB::FormatSettings settings; + settings.skip_unknown_fields = true; + settings.with_names_use_header = true; + if (!view.empty()) { + const std::string str(view); + const JSON json(str); +#define SUPPORTED_FLAGS(xx) \ + xx(skip_unknown_fields) \ + xx(import_nested_json) \ + xx(with_names_use_header) \ + xx(null_as_default) \ + +#define SET_FLAG(flag) \ + if (json.has(#flag)) \ + settings.flag = json[#flag].get<bool>(); + + SUPPORTED_FLAGS(SET_FLAG) + +#undef SET_FLAG +#undef SUPPORTED_FLAGS + } + return settings; +} + class TParseFormat : public TBoxedValue { public: TParseFormat(const std::string_view& type, const std::string_view& settings, @@ -569,31 +639,6 @@ public: UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.what()).data()); } private: - static NDB::FormatSettings GetFormatSettings(const std::string_view& view) { - NDB::FormatSettings settings; - settings.skip_unknown_fields = true; - settings.with_names_use_header = true; - if (!view.empty()) { - const std::string str(view); - const JSON json(str); - #define SUPPORTED_FLAGS(xx) \ - xx(skip_unknown_fields) \ - xx(import_nested_json) \ - xx(with_names_use_header) \ - xx(null_as_default) \ - - #define SET_FLAG(flag) \ - if (json.has(#flag)) \ - settings.flag = json[#flag].get<bool>(); - - SUPPORTED_FLAGS(SET_FLAG) - - #undef SET_FLAG - #undef SUPPORTED_FLAGS - } - return settings; - } - const std::string Type; const NDB::FormatSettings Settings; const TSourcePosition Pos; @@ -700,6 +745,93 @@ private: const ui32 TupleSize; }; +class TSerializeFormat : public TBoxedValue { + class TStreamValue : public TBoxedValue { + public: + TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::string& type, const NDB::FormatSettings& settings, const std::vector<TColumnMeta>& inMeta, const NDB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos) + : ValueBuilder(valueBuilder) + , Stream(stream) + , InMeta(inMeta) + , Pos(pos) + , Buffer(std::make_unique<NDB::WriteBufferFromOwnString>()) + , BlockStream(std::make_unique<NDB::OutputStreamToOutputFormat>(NDB::FormatFactory::instance().getOutputFormat(type, *Buffer, NDB::Block(columns), nullptr, {}, settings))) + , CurrentBlock(columns) + { + BlockStream->writePrefix(); + } + private: + EFetchStatus Fetch(TUnboxedValue& result) final try { + if (IsFinished) + return EFetchStatus::Finish; + + auto columns = CurrentBlock.mutateColumns(); + for (TUnboxedValue row; !IsFinished;) { + switch (const auto status = Stream.Fetch(row)) { + case EFetchStatus::Yield: + return EFetchStatus::Yield; + case EFetchStatus::Ok: + for (auto i = 0U; i < columns.size(); ++i) + ConvertInputValue(row.GetElement(i), columns[i], InMeta[i]); + continue; + case EFetchStatus::Finish: + IsFinished = true; + BlockStream->writeSuffix(); + BlockStream->flush(); + break; + } + } + + CurrentBlock.setColumns(std::move(columns)); + BlockStream->write(CurrentBlock); + BlockStream->flush(); + CurrentBlock = CurrentBlock.cloneEmpty(); + + result = ValueBuilder->NewString(Buffer->str()); + Buffer->restart(); + return EFetchStatus::Ok; + } + catch (const Poco::Exception& e) { + UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data()); + } + catch (const std::exception& e) { + UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.what()).data()); + } + + const IValueBuilder* ValueBuilder; + const TUnboxedValue Stream; + const std::vector<TColumnMeta> InMeta; + const TSourcePosition Pos; + + const std::unique_ptr<NDB::WriteBufferFromOwnString> Buffer; + const std::unique_ptr<NDB::IBlockOutputStream> BlockStream; + + NDB::Block CurrentBlock; + bool IsFinished = false; + }; + +public: + TSerializeFormat(const std::string_view& type, const std::string_view& settings, const TSourcePosition& pos, std::vector<TColumnMeta>&& inMeta, NDB::ColumnsWithTypeAndName&& columns) + : Type(type), Settings(GetFormatSettings(settings)), Pos(pos), InMeta(std::move(inMeta)), Columns(std::move(columns)) + {} + + TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final try { + return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, Type, Settings, InMeta, Columns, Pos)); + } + catch (const Poco::Exception& e) { + UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data()); + } + catch (const std::exception& e) { + UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.what()).data()); + } +private: + const std::string Type; + const NDB::FormatSettings Settings; + + const TSourcePosition Pos; + const std::vector<TColumnMeta> InMeta; + const NDB::ColumnsWithTypeAndName Columns; +}; + struct TCHInitializer { using TWeakPtr = std::weak_ptr<TCHInitializer>; using TPtr = std::shared_ptr<TCHInitializer>; @@ -838,7 +970,7 @@ public: TClickHouseClientModule() = default; static const TStringRef& Name() { - static const auto name = TStringRef::Of("ClickHouseClient"); + static constexpr auto name = TStringRef::Of("ClickHouseClient"); return name; } @@ -849,6 +981,7 @@ public: sink.Add(TStringRef::Of("ParseFormat"))->SetTypeAwareness(); sink.Add(TStringRef::Of("ParseBlocks"))->SetTypeAwareness(); sink.Add(TStringRef::Of("ParseFromYdb"))->SetTypeAwareness(); + sink.Add(TStringRef::Of("SerializeFormat"))->SetTypeAwareness(); } void BuildFunctionTypeInfo( @@ -859,7 +992,6 @@ public: IFunctionTypeInfoBuilder& builder) const final try { LazyInitContext(); auto argBuilder = builder.Args(); - if (name == "ToYqlType") { argBuilder->Add<TUtf8>(); argBuilder->Add<TUtf8>(); @@ -1013,6 +1145,58 @@ public: TTypePrinter(*typeHelper, resultType).Out(sb.Out); return builder.SetError(sb); } + } else if (name == "SerializeFormat") { + const auto userTypeInspector = TTupleTypeInspector(*typeHelper, userType); + if (!userTypeInspector || userTypeInspector.GetElementsCount() < 1) { + return builder.SetError("Invalid user type."); + } + + const auto argsTypeTuple = userTypeInspector.GetElementType(0); + const auto argsTypeInspector = TTupleTypeInspector(*typeHelper, argsTypeTuple); + if (!argsTypeInspector) { + return builder.SetError("Invalid user type - expected tuple."); + } + + if (const auto argsCount = argsTypeInspector.GetElementsCount(); argsCount != 1) { + ::TStringBuilder sb; + sb << "Invalid user type - expected one or two arguments, got: " << argsCount; + return builder.SetError(sb); + } + + const auto inputType = argsTypeInspector.GetElementType(0); + + builder.UserType(userType); + builder.Args()->Add(inputType).Done(); + builder.Returns(builder.Stream()->Item<char*>()); + + if (const auto structType = TStructTypeInspector(*typeHelper, TStreamTypeInspector(*typeHelper, inputType).GetItemType())) { + std::vector<TColumnMeta> outMeta(structType.GetMembersCount()); + NDB::ColumnsWithTypeAndName columns(structType.GetMembersCount()); + for (ui32 i = 0U; i < structType.GetMembersCount(); ++i) { + if (auto& meta = outMeta[i]; GetDataType(*typeHelper, structType.GetMemberType(i), meta)) { + auto& colsumn = columns[i]; + colsumn.type = MetaToClickHouse(meta); + colsumn.name = structType.GetMemberName(i); + } else { + ::TStringBuilder sb; + sb << "Incompatible column '" << structType.GetMemberName(i) << "' type: "; + TTypePrinter(*typeHelper, structType.GetMemberType(i)).Out(sb.Out); + return builder.SetError(sb); + } + } + + if (!(flags & TFlags::TypesOnly)) { + const std::string_view& typeCfg = typeConfig; + const auto jsonFrom = typeCfg.find('{'); + builder.Implementation(new TSerializeFormat(typeCfg.substr(0U, jsonFrom), std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom), builder.GetSourcePosition(), std::move(outMeta), std::move(columns))); + } + return; + } else { + ::TStringBuilder sb; + sb << "Incompatible row type: "; + TTypePrinter(*typeHelper, inputType).Out(sb.Out); + return builder.SetError(sb); + } } else if (name == "ParseFromYdb") { builder.UserType(userType); builder.Args()->Add(builder.Stream()->Item<char*>()).Add<TOptional<TUtf8>>().Done(); @@ -1040,7 +1224,7 @@ public: builder.Implementation(new TParseFromYdb(builder.GetSourcePosition(), std::move(columns))); } return; - } else { + } else { ::TStringBuilder sb; sb << "Incompatible row type: "; TTypePrinter(*typeHelper, userType).Out(sb.Out); diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp index 70373cc27b6..9dc0db62432 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp @@ -11,6 +11,7 @@ namespace NDB void registerInputFormatProcessorNative(FormatFactory & factory); void registerInputFormatProcessorJSONAsString(FormatFactory & factory); void registerInputFormatProcessorJSONEachRow(FormatFactory & factory); +void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory); void registerInputFormatProcessorRawBLOB(FormatFactory & factory); void registerInputFormatProcessorORC(FormatFactory & factory); void registerInputFormatProcessorArrow(FormatFactory & factory); @@ -29,6 +30,7 @@ void registerFormats() registerInputFormatProcessorNative(factory); registerInputFormatProcessorJSONAsString(factory); registerInputFormatProcessorJSONEachRow(factory); + registerOutputFormatProcessorJSONEachRow(factory); registerInputFormatProcessorRawBLOB(factory); registerInputFormatProcessorORC(factory); registerInputFormatProcessorArrow(factory); diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.cpp new file mode 100644 index 00000000000..89479042f4a --- /dev/null +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.cpp @@ -0,0 +1,142 @@ +#include <Poco/UTF8Encoding.h> +#include <IO/WriteBufferValidUTF8.h> +#include <Common/MemoryTracker.h> +#include <common/types.h> + +#ifdef __SSE2__ + #include <emmintrin.h> +#endif + + +namespace NDB +{ + +const size_t WriteBufferValidUTF8::DEFAULT_SIZE = 4096; + +/** Index into the table below with the first byte of a UTF-8 sequence to + * get the number of trailing bytes that are supposed to follow it. + * Note that *legal* UTF-8 values can't have 4 or 5-bytes. The table is + * left as-is for anyone who may want to do such conversion, which was + * allowed in earlier algorithms. + */ +extern const UInt8 length_of_utf8_sequence[256] = +{ + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, + 2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2, 2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2, + 3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3, 4,4,4,4,4,4,4,4,5,5,5,5,6,6,6,6 +}; + + +WriteBufferValidUTF8::WriteBufferValidUTF8( + WriteBuffer & output_buffer_, bool group_replacements_, const char * replacement_, size_t size) + : BufferWithOwnMemory<WriteBuffer>(std::max(static_cast<size_t>(32), size)), output_buffer(output_buffer_), + group_replacements(group_replacements_), replacement(replacement_) +{ +} + + +inline void WriteBufferValidUTF8::putReplacement() +{ + if (replacement.empty() || (group_replacements && just_put_replacement)) + return; + + just_put_replacement = true; + output_buffer.write(replacement.data(), replacement.size()); +} + + +inline void WriteBufferValidUTF8::putValid(char *data, size_t len) +{ + if (len == 0) + return; + + just_put_replacement = false; + output_buffer.write(data, len); +} + + +void WriteBufferValidUTF8::nextImpl() +{ + char * p = memory.data(); + char * valid_start = p; + + while (p < pos) + { +#ifdef __SSE2__ + /// Fast skip of ASCII + static constexpr size_t SIMD_BYTES = 16; + const char * simd_end = p + (pos - p) / SIMD_BYTES * SIMD_BYTES; + + while (p < simd_end && !_mm_movemask_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i*>(p)))) + p += SIMD_BYTES; + + if (!(p < pos)) + break; +#endif + + size_t len = length_of_utf8_sequence[static_cast<unsigned char>(*p)]; + + if (len > 4) + { // NOLINT + /// Invalid start of sequence. Skip one byte. + putValid(valid_start, p - valid_start); + putReplacement(); + ++p; + valid_start = p; + } + else if (p + len > pos) + { + /// Sequence was not fully written to this buffer. + break; + } + else if (Poco::UTF8Encoding::isLegal(reinterpret_cast<unsigned char *>(p), len)) + { + /// Valid sequence. + p += len; + } + else + { + /// Invalid sequence. Skip just first byte. + putValid(valid_start, p - valid_start); + putReplacement(); + ++p; + valid_start = p; + } + } + + putValid(valid_start, p - valid_start); + + size_t cnt = pos - p; + + /// Shift unfinished sequence to start of buffer. + for (size_t i = 0; i < cnt; ++i) + memory[i] = p[i]; + + working_buffer = Buffer(&memory[cnt], memory.data() + memory.size()); +} + + +void WriteBufferValidUTF8::finish() +{ + /// Write all complete sequences from buffer. + nextImpl(); + + /// If unfinished sequence at end, then write replacement. + if (working_buffer.begin() != memory.data()) + putReplacement(); +} + + +WriteBufferValidUTF8::~WriteBufferValidUTF8() +{ + /// FIXME move final flush into the caller + MemoryTracker::LockExceptionInThread lock(VariableContext::Global); + finish(); +} + +} diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.h b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.h new file mode 100644 index 00000000000..0e13bb5de0b --- /dev/null +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteBufferValidUTF8.h @@ -0,0 +1,41 @@ +#pragma once + +#include <IO/WriteBuffer.h> +#include <IO/BufferWithOwnMemory.h> + + +namespace NDB +{ + +/** Writes the data to another buffer, replacing the invalid UTF-8 sequences with the specified sequence. + * If the valid UTF-8 is already written, it works faster. + * Note: before using the resulting string, destroy this object. + */ +class WriteBufferValidUTF8 final : public BufferWithOwnMemory<WriteBuffer> +{ +private: + WriteBuffer & output_buffer; + bool group_replacements; + /// The last recorded character was `replacement`. + bool just_put_replacement = false; + std::string replacement; + + void putReplacement(); + void putValid(char * data, size_t len); + + void nextImpl() override; + void finish(); + +public: + static const size_t DEFAULT_SIZE; + + WriteBufferValidUTF8( + WriteBuffer & output_buffer_, + bool group_replacements_ = true, + const char * replacement_ = "\xEF\xBF\xBD", + size_t size = DEFAULT_SIZE); + + ~WriteBufferValidUTF8() override; +}; + +} diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp new file mode 100644 index 00000000000..6bb86ee9343 --- /dev/null +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp @@ -0,0 +1,157 @@ +#include <IO/WriteHelpers.h> +#include <IO/WriteBufferValidUTF8.h> +#include <Processors/Formats/Impl/JSONEachRowRowOutputFormat.h> +#include <Formats/FormatFactory.h> + + +namespace NDB +{ + + +JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat( + WriteBuffer & out_, + const Block & header_, + const RowOutputFormatParams & params_, + const FormatSettings & settings_) + : IRowOutputFormat(header_, out_, params_), + settings(settings_) +{ + const auto & sample = getPort(PortKind::Main).getHeader(); + size_t columns = sample.columns(); + fields.resize(columns); + + for (size_t i = 0; i < columns; ++i) + { + WriteBufferFromString buf(fields[i]); + writeJSONString(sample.getByPosition(i).name, buf, settings); + } +} + + +void JSONEachRowRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) +{ + writeString(fields[field_number], out); + writeChar(':', out); + + if (settings.json.serialize_as_strings) + { + WriteBufferFromOwnString buf; + + serialization.serializeText(column, row_num, buf, settings); + writeJSONString(buf.str(), out, settings); + } + else + serialization.serializeTextJSON(column, row_num, out, settings); + + ++field_number; +} + + +void JSONEachRowRowOutputFormat::writeFieldDelimiter() +{ + writeChar(',', out); +} + + +void JSONEachRowRowOutputFormat::writeRowStartDelimiter() +{ + writeChar('{', out); +} + + +void JSONEachRowRowOutputFormat::writeRowEndDelimiter() +{ + // Why do we need this weird `if`? + // + // The reason is the formatRow function that is broken with respect to + // row-between delimiters. It should not write them, but it does, and then + // hacks around it by having a special formatRowNoNewline version, which, as + // you guessed, removes the newline from the end of row. But the row-between + // delimiter goes into a second row, so it turns out to be in the beginning + // of the line, and the removal doesn't work. There is also a second bug -- + // the row-between delimiter in this format is written incorrectly. In fact, + // it is not written at all, and the newline is written in a row-end + // delimiter ("}\n" instead of the correct "}"). With these two bugs + // combined, the test 01420_format_row works perfectly. + // + // A proper implementation of formatRow would use IRowOutputFormat directly, + // and not write row-between delimiters, instead of using IOutputFormat + // processor and its crutch row callback. This would require exposing + // IRowOutputFormat, which we don't do now, but which can be generally useful + // for other cases such as parallel formatting, that also require a control + // flow different from the usual IOutputFormat. + // + // I just don't have time or energy to redo all of this, but I need to + // support JSON array output here, which requires proper ",\n" row-between + // delimiters. For compatibility, I preserve the bug in case of non-array + // output. + if (settings.json.array_of_rows) + { + writeCString("}", out); + } + else + { + writeCString("}\n", out); + } + field_number = 0; +} + + +void JSONEachRowRowOutputFormat::writeRowBetweenDelimiter() +{ + // We preserve an existing bug here for compatibility. See the comment above. + if (settings.json.array_of_rows) + { + writeCString(",\n", out); + } +} + + +void JSONEachRowRowOutputFormat::writePrefix() +{ + if (settings.json.array_of_rows) + { + writeCString("[\n", out); + } +} + + +void JSONEachRowRowOutputFormat::writeSuffix() +{ + if (settings.json.array_of_rows) + { + writeCString("\n]\n", out); + } +} + + +void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory) +{ + factory.registerOutputFormatProcessor("json_each_row", []( + WriteBuffer & buf, + const Block & sample, + const RowOutputFormatParams & params, + const FormatSettings & _format_settings) + { + FormatSettings settings = _format_settings; + settings.json.serialize_as_strings = false; + return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params, + settings); + }); + factory.markOutputFormatSupportsParallelFormatting("json_each_row"); + + factory.registerOutputFormatProcessor("JSONStringsEachRow", []( + WriteBuffer & buf, + const Block & sample, + const RowOutputFormatParams & params, + const FormatSettings & _format_settings) + { + FormatSettings settings = _format_settings; + settings.json.serialize_as_strings = true; + return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params, + settings); + }); + factory.markOutputFormatSupportsParallelFormatting("JSONStringEachRow"); +} + +} diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h new file mode 100644 index 00000000000..133b7b8323b --- /dev/null +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h @@ -0,0 +1,47 @@ +#pragma once + +#include <Core/Block.h> +#include <IO/WriteBuffer.h> +#include <Processors/Formats/IRowOutputFormat.h> +#include <Formats/FormatSettings.h> + + +namespace NDB +{ + +/** The stream for outputting data in JSON format, by object per line. + * Does not validate UTF-8. + */ +class JSONEachRowRowOutputFormat : public IRowOutputFormat +{ +public: + JSONEachRowRowOutputFormat( + WriteBuffer & out_, + const Block & header_, + const RowOutputFormatParams & params_, + const FormatSettings & settings_); + + String getName() const override { return "JSONEachRowRowOutputFormat"; } + + void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override; + void writeFieldDelimiter() override; + void writeRowStartDelimiter() override; + void writeRowEndDelimiter() override; + void writeRowBetweenDelimiter() override; + void writePrefix() override; + void writeSuffix() override; + +protected: + /// No totals and extremes. + void consumeTotals(Chunk) override {} + void consumeExtremes(Chunk) override {} + + size_t field_number = 0; + +private: + Names fields; + + FormatSettings settings; +}; + +} diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/InputStreamFromInputFormat.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/InputStreamFromInputFormat.h index de41f52cd72..de41f52cd72 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/InputStreamFromInputFormat.h +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/InputStreamFromInputFormat.h |