aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <a-romanov@yandex-team.ru>2022-06-10 12:22:19 +0300
committera-romanov <a-romanov@yandex-team.ru>2022-06-10 12:22:19 +0300
commitcc71ba34a990d5d652ca708abe5ce94b716975d3 (patch)
treeb07cdbdeb67639b23de2418d94ace16a75777b3f
parentee8c4bc836e1279aa849d82bb9c6a42704368c83 (diff)
downloadydb-cc71ba34a990d5d652ca708abe5ce94b716975d3.tar.gz
YQ-1099 + JSONAsString.
ref:d15ed6246d90dbbf37bb9ade813edc274b15cdb4
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.cpp5
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp2
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt3
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp9
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp3
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp196
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h38
7 files changed, 253 insertions, 3 deletions
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp
index 138b615701..8d88cfaacb 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.cpp
+++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp
@@ -22,16 +22,17 @@ namespace NCommon {
using namespace NNodes;
namespace {
- std::array<std::string_view, 7> Formats = {
+ constexpr std::array<std::string_view, 8> Formats = {
"csv_with_names"sv,
"tsv_with_names"sv,
"json_list"sv,
"json"sv,
"raw"sv,
+ "json_as_string"sv,
"json_each_row"sv,
"parquet"sv
};
- std::array<std::string_view, 6> Compressions = {
+ constexpr std::array<std::string_view, 6> Compressions = {
"gzip"sv,
"zstd"sv,
"lz4"sv,
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index ee5c34cc12..f3b0309ee3 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -580,6 +580,8 @@ NDB::DataTypePtr MetaToClickHouse(const TType* type) {
case NUdf::EDataSlot::Double:
return std::make_shared<NDB::DataTypeFloat64>();
case NUdf::EDataSlot::String:
+ case NUdf::EDataSlot::Utf8:
+ case NUdf::EDataSlot::Json:
return std::make_shared<NDB::DataTypeString>();
case NUdf::EDataSlot::Date:
case NUdf::EDataSlot::TzDate:
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt b/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt
index f3f0f3b7e2..20d1860daa 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt
+++ b/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt
@@ -32,6 +32,7 @@ target_link_libraries(clickhouse_client_udf INTERFACE
libs-apache-arrow
libs-apache-orc
libs-apache-avro
+ yql-minikql-dom
library-yql-utils
)
@@ -87,6 +88,7 @@ target_link_libraries(clickhouse_client_udf.global PUBLIC
libs-apache-arrow
libs-apache-orc
libs-apache-avro
+ yql-minikql-dom
library-yql-utils
)
target_sources(clickhouse_client_udf.global PRIVATE
@@ -433,6 +435,7 @@ target_sources(clickhouse_client_udf.global PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/AvroRowInputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowInputFormat.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
index 396490267d..cf16ed544c 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
@@ -1,6 +1,7 @@
#include <ydb/library/yql/public/udf/udf_helpers.h>
#include <ydb/library/yql/public/udf/udf_type_printer.h>
#include <ydb/library/yql/utils/utf8.h>
+#include <ydb/library/yql/minikql/dom/json.h>
#include <Poco/Util/Application.h>
@@ -190,6 +191,8 @@ NDB::DataTypePtr MetaToClickHouse(const TColumnMeta& meta) {
ret = std::make_shared<NDB::DataTypeFloat64>();
break;
case EDataSlot::String:
+ case EDataSlot::Utf8:
+ case EDataSlot::Json:
ret = std::make_shared<NDB::DataTypeString>();
break;
case EDataSlot::Date:
@@ -309,6 +312,12 @@ TUnboxedValuePod ConvertOutputValue(const NDB::IColumn* col, const TColumnMeta&
}
return valueBuilder->NewString({ ref.data, (ui32)ref.size }).Release();
}
+ else if (slot == EDataSlot::Json) {
+ if (!NDom::IsValidJson(std::string_view(ref))) {
+ ythrow yexception() << "Bad Json.";
+ }
+ return valueBuilder->NewString({ ref.data, (ui32)ref.size }).Release();
+ }
else if (slot == EDataSlot::Uuid) {
char uuid[16];
PermuteUuid(ref.data, uuid, false);
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp
index c597ecd731..70373cc27b 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.cpp
@@ -9,6 +9,7 @@ namespace NDB
{
void registerInputFormatProcessorNative(FormatFactory & factory);
+void registerInputFormatProcessorJSONAsString(FormatFactory & factory);
void registerInputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerInputFormatProcessorRawBLOB(FormatFactory & factory);
void registerInputFormatProcessorORC(FormatFactory & factory);
@@ -26,6 +27,7 @@ void registerFormats()
const std::unique_lock lock(factory.getSync());
if (factory.getAllFormats().empty()) {
registerInputFormatProcessorNative(factory);
+ registerInputFormatProcessorJSONAsString(factory);
registerInputFormatProcessorJSONEachRow(factory);
registerInputFormatProcessorRawBLOB(factory);
registerInputFormatProcessorORC(factory);
@@ -39,4 +41,3 @@ void registerFormats()
}
}
-
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp
new file mode 100644
index 0000000000..870180e42c
--- /dev/null
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp
@@ -0,0 +1,196 @@
+#include <Processors/Formats/Impl/JSONAsStringRowInputFormat.h>
+#include <Formats/JSONEachRowUtils.h>
+#include <DataTypes/DataTypeNullable.h>
+#include <DataTypes/DataTypeLowCardinality.h>
+#include <common/find_symbols.h>
+#include <IO/ReadHelpers.h>
+
+namespace NDB
+{
+
+namespace ErrorCodes
+{
+ extern const int BAD_ARGUMENTS;
+ extern const int INCORRECT_DATA;
+}
+
+JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) :
+ IRowInputFormat(header_, in_, std::move(params_)), buf(in)
+{
+ if (header_.columns() > 1)
+ throw Exception(ErrorCodes::BAD_ARGUMENTS,
+ "This input format is only suitable for tables with a single column of type String but the number of columns is {}",
+ header_.columns());
+
+ if (!isString(removeNullable(removeLowCardinality(header_.getByPosition(0).type))))
+ throw Exception(ErrorCodes::BAD_ARGUMENTS,
+ "This input format is only suitable for tables with a single column of type String but the column type is {}",
+ header_.getByPosition(0).type->getName());
+}
+
+void JSONAsStringRowInputFormat::resetParser()
+{
+ IRowInputFormat::resetParser();
+ buf.reset();
+}
+
+void JSONAsStringRowInputFormat::readPrefix()
+{
+ /// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
+ skipBOMIfExists(buf);
+
+ skipWhitespaceIfAny(buf);
+ if (!buf.eof() && *buf.position() == '[')
+ {
+ ++buf.position();
+ data_in_square_brackets = true;
+ }
+}
+
+void JSONAsStringRowInputFormat::readSuffix()
+{
+ skipWhitespaceIfAny(buf);
+ if (data_in_square_brackets)
+ {
+ assertChar(']', buf);
+ skipWhitespaceIfAny(buf);
+ }
+ if (!buf.eof() && *buf.position() == ';')
+ {
+ ++buf.position();
+ skipWhitespaceIfAny(buf);
+ }
+ assertEOF(buf);
+}
+
+void JSONAsStringRowInputFormat::readJSONObject(IColumn & column)
+{
+ PeekableReadBufferCheckpoint checkpoint{buf};
+ size_t balance = 0;
+ bool quotes = false;
+
+ if (*buf.position() != '{')
+ throw Exception("JSON object must begin with '{'.", ErrorCodes::INCORRECT_DATA);
+
+ ++buf.position();
+ ++balance;
+
+ char * pos;
+
+ while (balance)
+ {
+ if (buf.eof())
+ throw Exception("Unexpected end of file while parsing JSON object.", ErrorCodes::INCORRECT_DATA);
+
+ if (quotes)
+ {
+ pos = find_first_symbols<'"', '\\'>(buf.position(), buf.buffer().end());
+ buf.position() = pos;
+ if (buf.position() == buf.buffer().end())
+ continue;
+ if (*buf.position() == '"')
+ {
+ quotes = false;
+ ++buf.position();
+ }
+ else if (*buf.position() == '\\')
+ {
+ ++buf.position();
+ if (!buf.eof())
+ {
+ ++buf.position();
+ }
+ }
+ }
+ else
+ {
+ pos = find_first_symbols<'"', '{', '}', '\\'>(buf.position(), buf.buffer().end());
+ buf.position() = pos;
+ if (buf.position() == buf.buffer().end())
+ continue;
+ if (*buf.position() == '{')
+ {
+ ++balance;
+ ++buf.position();
+ }
+ else if (*buf.position() == '}')
+ {
+ --balance;
+ ++buf.position();
+ }
+ else if (*buf.position() == '\\')
+ {
+ ++buf.position();
+ if (!buf.eof())
+ {
+ ++buf.position();
+ }
+ }
+ else if (*buf.position() == '"')
+ {
+ quotes = true;
+ ++buf.position();
+ }
+ }
+ }
+ buf.makeContinuousMemoryFromCheckpointToPos();
+ char * end = buf.position();
+ buf.rollbackToCheckpoint();
+ column.insertData(buf.position(), end - buf.position());
+ buf.position() = end;
+}
+
+bool JSONAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
+{
+ if (!allow_new_rows)
+ return false;
+
+ skipWhitespaceIfAny(buf);
+ if (!buf.eof())
+ {
+ if (!data_in_square_brackets && *buf.position() == ';')
+ {
+ /// ';' means the end of query, but it cannot be before ']'.
+ return allow_new_rows = false;
+ }
+ else if (data_in_square_brackets && *buf.position() == ']')
+ {
+ /// ']' means the end of query.
+ return allow_new_rows = false;
+ }
+ }
+
+ if (!buf.eof())
+ readJSONObject(*columns[0]);
+
+ skipWhitespaceIfAny(buf);
+ if (!buf.eof() && *buf.position() == ',')
+ ++buf.position();
+ skipWhitespaceIfAny(buf);
+
+ return !buf.eof();
+}
+
+void registerInputFormatProcessorJSONAsString(FormatFactory & factory)
+{
+ factory.registerInputFormatProcessor("json_as_string", [](
+ ReadBuffer & buf,
+ const Block & sample,
+ const RowInputFormatParams & params,
+ const FormatSettings &)
+ {
+ return std::make_shared<JSONAsStringRowInputFormat>(sample, buf, params);
+ });
+}
+
+void registerFileSegmentationEngineJSONAsString(FormatFactory & factory)
+{
+ factory.registerFileSegmentationEngine("JSONAsString", &fileSegmentationEngineJSONEachRowImpl);
+}
+
+void registerNonTrivialPrefixAndSuffixCheckerJSONAsString(FormatFactory & factory)
+{
+ factory.registerNonTrivialPrefixAndSuffixChecker("JSONAsString", nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl);
+}
+
+}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h
new file mode 100644
index 0000000000..9b5cd9ae7e
--- /dev/null
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include <Processors/Formats/IRowInputFormat.h>
+#include <Formats/FormatFactory.h>
+#include <IO/PeekableReadBuffer.h>
+
+namespace NDB
+{
+
+class ReadBuffer;
+
+/// This format parses a sequence of JSON objects separated by newlines, spaces and/or comma.
+/// Each JSON object is parsed as a whole to string.
+/// This format can only parse a table with single field of type String.
+
+class JSONAsStringRowInputFormat : public IRowInputFormat
+{
+public:
+ JSONAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_);
+
+ bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
+ String getName() const override { return "JSONAsStringRowInputFormat"; }
+ void resetParser() override;
+
+ void readPrefix() override;
+ void readSuffix() override;
+
+private:
+ void readJSONObject(IColumn & column);
+
+ PeekableReadBuffer buf;
+
+ /// This flag is needed to know if data is in square brackets.
+ bool data_in_square_brackets = false;
+ bool allow_new_rows = true;
+};
+
+}