diff options
author | a-romanov <a-romanov@yandex-team.ru> | 2022-06-10 12:22:19 +0300 |
---|---|---|
committer | a-romanov <a-romanov@yandex-team.ru> | 2022-06-10 12:22:19 +0300 |
commit | cc71ba34a990d5d652ca708abe5ce94b716975d3 (patch) | |
tree | b07cdbdeb67639b23de2418d94ace16a75777b3f | |
parent | ee8c4bc836e1279aa849d82bb9c6a42704368c83 (diff) | |
download | ydb-cc71ba34a990d5d652ca708abe5ce94b716975d3.tar.gz |
YQ-1099 + JSONAsString.
ref:d15ed6246d90dbbf37bb9ade813edc274b15cdb4
7 files changed, 253 insertions, 3 deletions
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index 138b615701..8d88cfaacb 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -22,16 +22,17 @@ namespace NCommon { using namespace NNodes; namespace { - std::array<std::string_view, 7> Formats = { + constexpr std::array<std::string_view, 8> Formats = { "csv_with_names"sv, "tsv_with_names"sv, "json_list"sv, "json"sv, "raw"sv, + "json_as_string"sv, "json_each_row"sv, "parquet"sv }; - std::array<std::string_view, 6> Compressions = { + constexpr std::array<std::string_view, 6> Compressions = { "gzip"sv, "zstd"sv, "lz4"sv, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index ee5c34cc12..f3b0309ee3 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -580,6 +580,8 @@ NDB::DataTypePtr MetaToClickHouse(const TType* type) { case NUdf::EDataSlot::Double: return std::make_shared<NDB::DataTypeFloat64>(); case NUdf::EDataSlot::String: + case NUdf::EDataSlot::Utf8: + case NUdf::EDataSlot::Json: return std::make_shared<NDB::DataTypeString>(); case NUdf::EDataSlot::Date: case NUdf::EDataSlot::TzDate: diff --git a/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt b/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt index f3f0f3b7e2..20d1860daa 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt +++ b/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt @@ -32,6 +32,7 @@ target_link_libraries(clickhouse_client_udf INTERFACE libs-apache-arrow libs-apache-orc libs-apache-avro + yql-minikql-dom library-yql-utils ) @@ -87,6 +88,7 @@ target_link_libraries(clickhouse_client_udf.global PUBLIC libs-apache-arrow libs-apache-orc libs-apache-avro + yql-minikql-dom library-yql-utils ) target_sources(clickhouse_client_udf.global PRIVATE @@ -433,6 +435,7 @@ target_sources(clickhouse_client_udf.global PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/AvroRowInputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowInputFormat.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp diff --git a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp index 396490267d..cf16ed544c 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp @@ -1,6 +1,7 @@ #include <ydb/library/yql/public/udf/udf_helpers.h> #include <ydb/library/yql/public/udf/udf_type_printer.h> #include <ydb/library/yql/utils/utf8.h> +#include <ydb/library/yql/minikql/dom/json.h> #include <Poco/Util/Application.h> @@ -190,6 +191,8 @@ NDB::DataTypePtr MetaToClickHouse(const TColumnMeta& meta) { ret = std::make_shared<NDB::DataTypeFloat64>(); break; case EDataSlot::String: + case EDataSlot::Utf8: + case EDataSlot::Json: ret = std::make_shared<NDB::DataTypeString>(); break; case EDataSlot::Date: @@ -309,6 +312,12 @@ TUnboxedValuePod ConvertOutputValue(const NDB::IColumn* col, const TColumnMeta& } return valueBuilder->NewString({ ref.data, (ui32)ref.size }).Release(); } + else if (slot == EDataSlot::Json) { + if (!NDom::IsValidJson(std::string_view(ref))) { + ythrow yexception() << "Bad Json."; + } + return valueBuilder->NewString({ ref.data, (ui32)ref.size }).Release(); + } else if (slot == EDataSlot::Uuid) { char uuid[16]; PermuteUuid(ref.data, uuid, false); diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp index c597ecd731..70373cc27b 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp @@ -9,6 +9,7 @@ namespace NDB { void registerInputFormatProcessorNative(FormatFactory & factory); +void registerInputFormatProcessorJSONAsString(FormatFactory & factory); void registerInputFormatProcessorJSONEachRow(FormatFactory & factory); void registerInputFormatProcessorRawBLOB(FormatFactory & factory); void registerInputFormatProcessorORC(FormatFactory & factory); @@ -26,6 +27,7 @@ void registerFormats() const std::unique_lock lock(factory.getSync()); if (factory.getAllFormats().empty()) { registerInputFormatProcessorNative(factory); + registerInputFormatProcessorJSONAsString(factory); registerInputFormatProcessorJSONEachRow(factory); registerInputFormatProcessorRawBLOB(factory); registerInputFormatProcessorORC(factory); @@ -39,4 +41,3 @@ void registerFormats() } } - diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp new file mode 100644 index 0000000000..870180e42c --- /dev/null +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp @@ -0,0 +1,196 @@ +#include <Processors/Formats/Impl/JSONAsStringRowInputFormat.h> +#include <Formats/JSONEachRowUtils.h> +#include <DataTypes/DataTypeNullable.h> +#include <DataTypes/DataTypeLowCardinality.h> +#include <common/find_symbols.h> +#include <IO/ReadHelpers.h> + +namespace NDB +{ + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int INCORRECT_DATA; +} + +JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) : + IRowInputFormat(header_, in_, std::move(params_)), buf(in) +{ + if (header_.columns() > 1) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "This input format is only suitable for tables with a single column of type String but the number of columns is {}", + header_.columns()); + + if (!isString(removeNullable(removeLowCardinality(header_.getByPosition(0).type)))) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "This input format is only suitable for tables with a single column of type String but the column type is {}", + header_.getByPosition(0).type->getName()); +} + +void JSONAsStringRowInputFormat::resetParser() +{ + IRowInputFormat::resetParser(); + buf.reset(); +} + +void JSONAsStringRowInputFormat::readPrefix() +{ + /// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it. + skipBOMIfExists(buf); + + skipWhitespaceIfAny(buf); + if (!buf.eof() && *buf.position() == '[') + { + ++buf.position(); + data_in_square_brackets = true; + } +} + +void JSONAsStringRowInputFormat::readSuffix() +{ + skipWhitespaceIfAny(buf); + if (data_in_square_brackets) + { + assertChar(']', buf); + skipWhitespaceIfAny(buf); + } + if (!buf.eof() && *buf.position() == ';') + { + ++buf.position(); + skipWhitespaceIfAny(buf); + } + assertEOF(buf); +} + +void JSONAsStringRowInputFormat::readJSONObject(IColumn & column) +{ + PeekableReadBufferCheckpoint checkpoint{buf}; + size_t balance = 0; + bool quotes = false; + + if (*buf.position() != '{') + throw Exception("JSON object must begin with '{'.", ErrorCodes::INCORRECT_DATA); + + ++buf.position(); + ++balance; + + char * pos; + + while (balance) + { + if (buf.eof()) + throw Exception("Unexpected end of file while parsing JSON object.", ErrorCodes::INCORRECT_DATA); + + if (quotes) + { + pos = find_first_symbols<'"', '\\'>(buf.position(), buf.buffer().end()); + buf.position() = pos; + if (buf.position() == buf.buffer().end()) + continue; + if (*buf.position() == '"') + { + quotes = false; + ++buf.position(); + } + else if (*buf.position() == '\\') + { + ++buf.position(); + if (!buf.eof()) + { + ++buf.position(); + } + } + } + else + { + pos = find_first_symbols<'"', '{', '}', '\\'>(buf.position(), buf.buffer().end()); + buf.position() = pos; + if (buf.position() == buf.buffer().end()) + continue; + if (*buf.position() == '{') + { + ++balance; + ++buf.position(); + } + else if (*buf.position() == '}') + { + --balance; + ++buf.position(); + } + else if (*buf.position() == '\\') + { + ++buf.position(); + if (!buf.eof()) + { + ++buf.position(); + } + } + else if (*buf.position() == '"') + { + quotes = true; + ++buf.position(); + } + } + } + buf.makeContinuousMemoryFromCheckpointToPos(); + char * end = buf.position(); + buf.rollbackToCheckpoint(); + column.insertData(buf.position(), end - buf.position()); + buf.position() = end; +} + +bool JSONAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &) +{ + if (!allow_new_rows) + return false; + + skipWhitespaceIfAny(buf); + if (!buf.eof()) + { + if (!data_in_square_brackets && *buf.position() == ';') + { + /// ';' means the end of query, but it cannot be before ']'. + return allow_new_rows = false; + } + else if (data_in_square_brackets && *buf.position() == ']') + { + /// ']' means the end of query. + return allow_new_rows = false; + } + } + + if (!buf.eof()) + readJSONObject(*columns[0]); + + skipWhitespaceIfAny(buf); + if (!buf.eof() && *buf.position() == ',') + ++buf.position(); + skipWhitespaceIfAny(buf); + + return !buf.eof(); +} + +void registerInputFormatProcessorJSONAsString(FormatFactory & factory) +{ + factory.registerInputFormatProcessor("json_as_string", []( + ReadBuffer & buf, + const Block & sample, + const RowInputFormatParams & params, + const FormatSettings &) + { + return std::make_shared<JSONAsStringRowInputFormat>(sample, buf, params); + }); +} + +void registerFileSegmentationEngineJSONAsString(FormatFactory & factory) +{ + factory.registerFileSegmentationEngine("JSONAsString", &fileSegmentationEngineJSONEachRowImpl); +} + +void registerNonTrivialPrefixAndSuffixCheckerJSONAsString(FormatFactory & factory) +{ + factory.registerNonTrivialPrefixAndSuffixChecker("JSONAsString", nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl); +} + +} diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h new file mode 100644 index 0000000000..9b5cd9ae7e --- /dev/null +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h @@ -0,0 +1,38 @@ +#pragma once + +#include <Processors/Formats/IRowInputFormat.h> +#include <Formats/FormatFactory.h> +#include <IO/PeekableReadBuffer.h> + +namespace NDB +{ + +class ReadBuffer; + +/// This format parses a sequence of JSON objects separated by newlines, spaces and/or comma. +/// Each JSON object is parsed as a whole to string. +/// This format can only parse a table with single field of type String. + +class JSONAsStringRowInputFormat : public IRowInputFormat +{ +public: + JSONAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_); + + bool readRow(MutableColumns & columns, RowReadExtension & ext) override; + String getName() const override { return "JSONAsStringRowInputFormat"; } + void resetParser() override; + + void readPrefix() override; + void readSuffix() override; + +private: + void readJSONObject(IColumn & column); + + PeekableReadBuffer buf; + + /// This flag is needed to know if data is in square brackets. + bool data_in_square_brackets = false; + bool allow_new_rows = true; +}; + +} |