diff options
author | uzhas <uzhas@ydb.tech> | 2022-09-19 16:15:53 +0300 |
---|---|---|
committer | uzhas <uzhas@ydb.tech> | 2022-09-19 16:15:53 +0300 |
commit | 7495c426ef6955671d7ec18ac944a69ab180dab2 (patch) | |
tree | a04a75bb731f2d4ed04499d1bd246f19ddba8c35 | |
parent | ef5e00f267aa9f7b290586d8f6dc2653d824ea56 (diff) | |
download | ydb-7495c426ef6955671d7ec18ac944a69ab180dab2.tar.gz |
fix buffer ref on stack var
-rw-r--r-- | ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp | 3217 |
1 files changed, 1609 insertions, 1608 deletions
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp index 569cbc6a04c..59d62db4fed 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp @@ -1,1608 +1,1609 @@ -#include <ydb/library/yql/public/udf/udf_helpers.h> -#include <ydb/library/yql/public/udf/udf_type_printer.h> -#include <ydb/library/yql/utils/utf8.h> -#include <ydb/library/yql/minikql/dom/json.h> - -#include <Poco/Util/Application.h> - -#include "base/common/JSON.h" - -#include "src/Columns/ColumnAggregateFunction.h" -#include "src/Columns/ColumnArray.h" -#include "src/Columns/ColumnConst.h" -#include "src/Columns/ColumnNullable.h" -#include "src/Columns/ColumnTuple.h" -#include "src/Columns/ColumnsNumber.h" -#include "src/Columns/ColumnString.h" - -#include "src/DataStreams/NativeBlockInputStream.h" - -#include "src/DataTypes/DataTypeEnum.h" -#include "src/DataTypes/DataTypesNumber.h" -#include "src/DataTypes/DataTypeDate.h" -#include "src/DataTypes/DataTypeFactory.h" -#include "src/DataTypes/DataTypeArray.h" -#include "src/DataTypes/DataTypeNothing.h" -#include "src/DataTypes/DataTypeTuple.h" -#include "src/DataTypes/DataTypeNullable.h" -#include "src/DataTypes/DataTypeString.h" -#include "src/DataTypes/DataTypeUUID.h" - -#include "src/Core/Block.h" -#include "src/Core/ColumnsWithTypeAndName.h" - -#include "src/Formats/FormatFactory.h" -#include "src/Formats/registerFormats.h" -#include "src/Processors/Formats/InputStreamFromInputFormat.h" -#include "src/Processors/Formats/OutputStreamToOutputFormat.h" - -#include <util/generic/size_literals.h> -#include <util/generic/yexception.h> -#include <util/string/split.h> - -using namespace NYql; -using namespace NUdf; - -namespace { -struct TPgConvertInfo { - ui32 TypeId = 0; - TStringRef TypeName; - i32 TypeLen = 0; - const TType* SourceDataType = nullptr; - std::optional<EDataSlot> SourceLogicalSlot; // empty means that there is not corresponding type for pg type, e.g. point -}; - -struct TColumnMeta { - std::optional<TPgConvertInfo> PgConvertInfo; - std::optional<TString> Aggregation; - std::optional<EDataSlot> Slot; // empty if null type - bool IsOptional = false; - bool IsEmptyList = false; - bool IsList = false; - bool IsTuple = false; - ui8 Precision = 0, Scale = 0; // filled for Decimal - std::vector<TColumnMeta> Items; // filled if IsTuple or IsList - std::vector<TString> Enum; - std::unordered_map<i16, std::pair<TString, ui32>> EnumVar; -}; - -std::pair<std::optional<EDataSlot>, TType*> InferSlotFromPgType(IFunctionTypeInfoBuilder& typeInfoBuilder, const TStringRef typeName) { - if (typeName == TStringRef("bool")) { - return { EDataSlot::Bool, typeInfoBuilder.SimpleType<bool>() }; - } - if (typeName == TStringRef("int4")) { - return { EDataSlot::Int32, typeInfoBuilder.SimpleType<i32>() }; - } - if (typeName == TStringRef("int8")) { - return { EDataSlot::Int64, typeInfoBuilder.SimpleType<i64>() }; - } - if (typeName == TStringRef("float4")) { - return { EDataSlot::Float, typeInfoBuilder.SimpleType<float>() }; - } - if (typeName == TStringRef("float8")) { - return { EDataSlot::Double, typeInfoBuilder.SimpleType<double>() }; - } - if (typeName == TStringRef("text")) { - return { EDataSlot::Utf8, typeInfoBuilder.SimpleType<TUtf8>() }; - } - if (IsIn({ TStringRef("cstring"), TStringRef("varchar"), TStringRef("bytea") }, typeName)) { - return { EDataSlot::String, nullptr }; - } - return {}; -} - -TPgConvertInfo GetPgConvertInfo(IFunctionTypeInfoBuilder& typeInfoBuilder, const ITypeInfoHelper& typeHelper, const TType* type) { - TPgConvertInfo info; - info.TypeId = TPgTypeInspector(typeHelper, type).GetTypeId(); - auto* pgDescription = typeHelper.FindPgTypeDescription(info.TypeId); - Y_ENSURE(pgDescription); - info.TypeLen = pgDescription->Typelen; - info.TypeName = pgDescription->Name; - - std::tie(info.SourceLogicalSlot, info.SourceDataType) = InferSlotFromPgType(typeInfoBuilder, pgDescription->Name); - if (info.SourceDataType) { - info.SourceDataType = typeInfoBuilder.Optional()->Item(info.SourceDataType).Build(); - } - return info; -} - -template<bool MaybeOptional = true> -bool GetDataType(IFunctionTypeInfoBuilder& typeInfoBuilder, const ITypeInfoHelper& typeHelper, const TType* type, TColumnMeta& meta) { - switch (typeHelper.GetTypeKind(type)) { - case ETypeKind::Tuple: { - meta.IsTuple = true; - const TTupleTypeInspector tupleType(typeHelper, type); - meta.Items.resize(tupleType.GetElementsCount()); - for (auto i = 0U; i < meta.Items.size(); ++i) - if (!GetDataType(typeInfoBuilder, typeHelper, tupleType.GetElementType(i), meta.Items[i])) - return false; - return true; - } - case ETypeKind::List: - meta.IsList = true; - type = TListTypeInspector(typeHelper, type).GetItemType(); - meta.Items.resize(1U); - return GetDataType(typeInfoBuilder, typeHelper, type, meta.Items.front()); - case ETypeKind::Optional: - if constexpr (MaybeOptional) { - meta.IsOptional = true; - type = TOptionalTypeInspector(typeHelper, type).GetItemType(); - return GetDataType<false>(typeInfoBuilder, typeHelper, type, meta); - } - else - break; - case ETypeKind::Data: { - const TDataAndDecimalTypeInspector dataType(typeHelper, type); - meta.Slot = GetDataSlot(dataType.GetTypeId()); - meta.Precision = dataType.GetPrecision(); - meta.Scale = dataType.GetScale(); - return true; - } - case ETypeKind::Pg: { - meta.PgConvertInfo = GetPgConvertInfo(typeInfoBuilder, typeHelper, type); - return true; - } - default: - break; - } - return false; -} - -NDB::DataTypePtr PgMetaToClickHouse(const TPgConvertInfo& info) { - if (info.TypeName == TStringRef("bool")) { - return std::make_shared<NDB::DataTypeUInt8>(); - } - - if (info.TypeName == TStringRef("int4")) { - return std::make_shared<NDB::DataTypeInt32>(); - } - - if (info.TypeName == TStringRef("int8")) { - return std::make_shared<NDB::DataTypeInt64>(); - } - - if (info.TypeName == TStringRef("float4")) { - return std::make_shared<NDB::DataTypeFloat32>(); - } - - if (info.TypeName == TStringRef("float8")) { - return std::make_shared<NDB::DataTypeFloat64>(); - } - return std::make_shared<NDB::DataTypeString>(); -} - -NDB::DataTypePtr PgMetaToNullableClickHouse(const TPgConvertInfo& info) { - return makeNullable(PgMetaToClickHouse(info)); -} - -NDB::DataTypePtr MetaToClickHouse(const TColumnMeta& meta) { - if (meta.PgConvertInfo) { - return PgMetaToNullableClickHouse(*meta.PgConvertInfo); - } - - if (!meta.Enum.empty()) { - if (meta.Enum.size() < 256) { - NDB::DataTypeEnum8::Values values; - if (!meta.EnumVar.size()) { - for (ui32 i = 0; i < meta.Enum.size(); ++i) { - std::string name(meta.Enum[i]); - values.emplace_back(std::make_pair(name, i8(Min<i8>() + i))); - } - } - else { - for (const auto& x : meta.EnumVar) { - std::string name(x.second.first); - values.emplace_back(std::make_pair(name, i8(x.first))); - } - } - - return std::make_shared<NDB::DataTypeEnum8>(values); - } - else { - NDB::DataTypeEnum16::Values values; - if (!meta.EnumVar.size()) { - for (ui32 i = 0; i < meta.Enum.size(); ++i) { - std::string name(meta.Enum[i]); - values.emplace_back(std::make_pair(name, i16(Min<i16>() + i))); - } - } - else { - for (const auto& x : meta.EnumVar) { - std::string name(x.second.first); - values.emplace_back(std::make_pair(name, x.first)); - } - } - - return std::make_shared<NDB::DataTypeEnum16>(values); - } - } - - if (meta.Aggregation) { - return NDB::DataTypeFactory::instance().get(*meta.Aggregation); - } - - if (meta.IsEmptyList) { - return std::make_shared<NDB::DataTypeArray>(std::make_shared<NDB::DataTypeNothing>()); - } - - if (meta.IsList) { - return std::make_shared<NDB::DataTypeArray>(MetaToClickHouse(meta.Items.front())); - } - - if (meta.IsTuple) { - NDB::DataTypes elems; - for (const auto& e : meta.Items) { - elems.push_back(MetaToClickHouse(e)); - } - - return std::make_shared<NDB::DataTypeTuple>(elems); - } - - NDB::DataTypePtr ret; - if (!meta.Slot) { - ret = makeNullable(std::make_shared<NDB::DataTypeNothing>()); - } - else { - switch (*meta.Slot) { - case EDataSlot::Int8: - ret = std::make_shared<NDB::DataTypeInt8>(); - break; - case EDataSlot::Bool: - case EDataSlot::Uint8: - ret = std::make_shared<NDB::DataTypeUInt8>(); - break; - case EDataSlot::Int16: - ret = std::make_shared<NDB::DataTypeInt16>(); - break; - case EDataSlot::Uint16: - ret = std::make_shared<NDB::DataTypeUInt16>(); - break; - case EDataSlot::Int32: - ret = std::make_shared<NDB::DataTypeInt32>(); - break; - case EDataSlot::Uint32: - ret = std::make_shared<NDB::DataTypeUInt32>(); - break; - case EDataSlot::Int64: - ret = std::make_shared<NDB::DataTypeInt64>(); - break; - case EDataSlot::Uint64: - ret = std::make_shared<NDB::DataTypeUInt64>(); - break; - case EDataSlot::Float: - ret = std::make_shared<NDB::DataTypeFloat32>(); - break; - case EDataSlot::Double: - ret = std::make_shared<NDB::DataTypeFloat64>(); - break; - case EDataSlot::String: - case EDataSlot::Utf8: - case EDataSlot::Json: - ret = std::make_shared<NDB::DataTypeString>(); - break; - case EDataSlot::Date: - case EDataSlot::TzDate: - ret = std::make_shared<NDB::DataTypeDate>(); - break; - case EDataSlot::Datetime: - case EDataSlot::TzDatetime: - ret = std::make_shared<NDB::DataTypeDateTime>(); - break; - case EDataSlot::Uuid: - ret = std::make_shared<NDB::DataTypeUUID>(); - break; - default: - ythrow yexception() << "Unsupported argument type: " << GetDataTypeInfo(*meta.Slot).Name; - } - } - - ret = meta.IsOptional ? makeNullable(ret) : ret; - return ret; -} - -template<bool Forward> -void PermuteUuid(const char* src, char* dst) { - static constexpr ui32 Pairs[16] = { 4, 5, 6, 7, 2, 3, 0, 1, 15, 14, 13, 12, 11, 10, 9, 8 }; - static constexpr ui32 InvPairs[16] = { 6, 7, 4, 5, 0, 1, 2, 3, 15, 14, 13, 12, 11, 10, 9, 8 }; - for (ui32 i = 0; i < 16; ++i) { - dst[Forward ? Pairs[i] : InvPairs[i]] = src[i]; - } -} - -TUnboxedValuePod ConvertOutputValueForPgType(const NDB::IColumn* col, const TPgConvertInfo& meta, - const IValueBuilder* valueBuilder, ssize_t externalIndex) -{ - Y_ENSURE(col->getDataType() == NDB::TypeIndex::Nullable); - NDB::StringRef ref; - if (col->isNullAt(externalIndex)) { - return {}; - } - - auto& pgBuilder = valueBuilder->GetPgBuilder(); - ref = static_cast<const NDB::ColumnNullable*>(col)->getNestedColumn().getDataAt(externalIndex); - if (!meta.SourceLogicalSlot) { - // just parse pg value - TStringValue parseError(""); - if (ref.size > std::numeric_limits<ui32>::max()) { - ythrow yexception() << "Too huge payload to parse pg value: " << ref.size; - } - auto ret = pgBuilder.ValueFromText(meta.TypeId, { ref.data, static_cast<ui32>(ref.size) }, parseError); - if (!ret) { - ythrow yexception() << "Failed to parse value of pg type " << meta.TypeName << ", details: " << TStringBuf(parseError.Data(), parseError.Size()); - } - } - - switch (*meta.SourceLogicalSlot) { - case EDataSlot::Bool: - [[fallthrough]]; - case EDataSlot::Int32: - [[fallthrough]]; - case EDataSlot::Int64: - [[fallthrough]]; - case EDataSlot::Float: - [[fallthrough]]; - case EDataSlot::Double: { - TUnboxedValuePod ret = TUnboxedValuePod::Zero(); - Y_ENSURE(ref.size <= 8); - memcpy(&ret, ref.data, ref.size); - return pgBuilder.ConvertToPg(ret, meta.SourceDataType, meta.TypeId).Release(); - } - case EDataSlot::Utf8: - if (!IsUtf8(std::string_view(ref))) { - ythrow yexception() << "Bad Utf8."; - } - [[fallthrough]]; - case EDataSlot::String: - if (ref.size > std::numeric_limits<ui32>::max()) { - ythrow yexception() << "Too huge payload to build pg string: " << ref.size; - } - - return pgBuilder.NewString(meta.TypeLen, meta.TypeId, { ref.data, static_cast<ui32>(ref.size) }).Release(); - default: - ythrow yexception() << "Unexpected slot " << *meta.SourceLogicalSlot << " for pg type " << meta.TypeName; - } -} - -TUnboxedValuePod ConvertOutputValue(const NDB::IColumn* col, const TColumnMeta& meta, ui32 tzId, - const IValueBuilder* valueBuilder, ssize_t externalIndex = 0) { - if (meta.PgConvertInfo) { - return ConvertOutputValueForPgType(col, *meta.PgConvertInfo, valueBuilder, externalIndex); - } - - if (!meta.Enum.empty()) { - auto ref = col->getDataAt(externalIndex); - i16 value; - if (col->getDataType() == NDB::TypeIndex::Int8) { - value = *(const i8*)ref.data; - } - else if (col->getDataType() == NDB::TypeIndex::Int16) { - value = *(const i16*)ref.data; - } - else { - ythrow yexception() << "Unsupported column type: " << col->getName(); - } - - Y_ENSURE(meta.EnumVar.size() == meta.Enum.size()); - const auto x = meta.EnumVar.find(value); - if (x == meta.EnumVar.cend()) { - ythrow yexception() << "Missing enum value: " << value; - } - - const ui32 index = x->second.second; - return valueBuilder->NewVariant(index, TUnboxedValue::Void()).Release(); - } - - if (meta.Aggregation) { - auto field = (*col)[externalIndex]; - const auto& state = field.get<NDB::AggregateFunctionStateData &>(); - if (state.data.size() > std::numeric_limits<ui32>::max()) { - ythrow yexception() << "Too huge aggregation state to put it into string: " << state.data.size(); - } - - return valueBuilder->NewString({ state.data.data(), static_cast<ui32>(state.data.size()) }).Release(); - } - - if (meta.IsEmptyList) { - return valueBuilder->NewEmptyList().Release(); - } - - const NDB::IColumn* data = col; - if (meta.IsList) { - const NDB::ColumnArray& res = static_cast<const NDB::ColumnArray&>(*col); - const NDB::IColumn::Offsets& offsets = res.getOffsets(); - data = &res.getData(); - ui64 start = offsets[externalIndex - 1]; - ui64 limit = offsets[externalIndex] - start; - TUnboxedValue* items = nullptr; - TUnboxedValue ret = valueBuilder->NewArray(limit, items); - for (ui64 index = start; index < start + limit; ++index) { - TUnboxedValue* current = items + index - start; - *current = ConvertOutputValue(data, meta.Items.front(), tzId, valueBuilder, index); - } - - return ret.Release(); - } - - if (meta.IsTuple) { - auto count = meta.Items.size(); - TUnboxedValue* items = nullptr; - TUnboxedValue ret = valueBuilder->NewArray(count, items); - const NDB::ColumnTuple& res = static_cast<const NDB::ColumnTuple&>(*col); - for (ui32 i = 0; i < count; ++i) { - items[i] = ConvertOutputValue(&res.getColumn(i), meta.Items[i], tzId, valueBuilder, externalIndex); - } - - return ret.Release(); - } - - if (!meta.Slot) { - return {}; - } - - NDB::StringRef ref; - if (meta.IsOptional) { - if (data->isNullAt(externalIndex)) { - return {}; - } - - ref = static_cast<const NDB::ColumnNullable*>(data)->getNestedColumn().getDataAt(externalIndex); - } - else { - ref = data->getDataAt(externalIndex); - } - - if (const auto slot = *meta.Slot; slot == EDataSlot::String) { - if (ref.size > std::numeric_limits<ui32>::max()) { - ythrow yexception() << "Too huge payload to build string: " << ref.size; - } - - return valueBuilder->NewString({ ref.data, static_cast<ui32>(ref.size) }).Release(); - } - else if (slot == EDataSlot::Utf8) { - if (!IsUtf8(std::string_view(ref))) { - ythrow yexception() << "Bad Utf8."; - } - if (ref.size > std::numeric_limits<ui32>::max()) { - ythrow yexception() << "Too huge payload to build Utf8 string: " << ref.size; - } - - return valueBuilder->NewString({ ref.data, static_cast<ui32>(ref.size) }).Release(); - } - else if (slot == EDataSlot::Json) { - if (!NDom::IsValidJson(std::string_view(ref))) { - ythrow yexception() << "Bad Json."; - } - if (ref.size > std::numeric_limits<ui32>::max()) { - ythrow yexception() << "Too huge payload to build Json string: " << ref.size; - } - - return valueBuilder->NewString({ ref.data, static_cast<ui32>(ref.size) }).Release(); - } - else if (slot == EDataSlot::Uuid) { - char uuid[16]; - if (ref.size != sizeof(uuid)) { - ythrow yexception() << "Unsupported Uuid of size " << ref.size; - } - PermuteUuid<false>(ref.data, uuid); - return valueBuilder->NewString({ uuid, sizeof(uuid) }).Release(); - } - else if (slot == EDataSlot::Decimal) { - if (NDecimal::TInt128 decimal; ref.size == sizeof(decimal)) { - std::memcpy(&decimal, ref.data, ref.size); - return TUnboxedValuePod(decimal); - } - else - ythrow yexception() << "Unsupported decimal of size " << ref.size; - } - else { - auto size = GetDataTypeInfo(*meta.Slot).FixedSize; - TUnboxedValuePod ret = TUnboxedValuePod::Zero(); - Y_ENSURE(ref.size <= 8); - memcpy(&ret, ref.data, size); - if (tzId) { - if (*meta.Slot == EDataSlot::TzDatetime) { - ret.SetTimezoneId(tzId); - } - else if (*meta.Slot == EDataSlot::TzDate) { - auto& builder = valueBuilder->GetDateBuilder(); - ui32 year, month, day; - if (const auto date = ret.Get<ui16>(); !builder.SplitDate(date, year, month, day)) { - ythrow yexception() << "Error in SplitDate(" << date << ")."; - } - - ui32 datetime; - if (!builder.MakeDatetime(year, month, day, 23u, 59u, 0u, datetime, tzId)) { - ythrow yexception() << "Error in MakeDatetime(" << year << ',' << month << ',' << day << ',' << tzId << ")."; - } - - ret = TUnboxedValuePod(ui16(datetime / 86400u)); - ret.SetTimezoneId(tzId); - } - } - - return ret; - } -} - -void ConvertInputValue(const TUnboxedValuePod& value, NDB::IColumn* col, const TColumnMeta& meta) { - if (meta.IsOptional && !value) - return col->insert(NDB::Field()); - - if (meta.IsEmptyList) { - col->insertDefault(); - return; - } - - if (meta.IsList) { - auto& res = static_cast<NDB::ColumnArray&>(*col); - auto& offsets = res.getOffsets(); - const auto data = &res.getData(); - ui64 count = 0ULL; - if (const auto elements = value.GetElements()) { - for (const auto length = value.GetListLength(); count < length; ++count) - ConvertInputValue(elements[count], data, meta.Items.front()); - } else { - const auto iterator = value.GetListIterator(); - for (TUnboxedValue current; iterator.Next(current); ++count) { - ConvertInputValue(current, data, meta.Items.front()); - } - } - - const auto prev = offsets.back(); - offsets.emplace_back(prev + count); - return; - } - - if (meta.IsTuple) { - auto& res = static_cast<NDB::ColumnTuple&>(*col); - for (ui32 i = 0U; i < meta.Items.size(); ++i) { - const auto current = value.GetElement(i); - ConvertInputValue(current, &res.getColumn(i), meta.Items[i]); - } - - return; - } - - if (!meta.Slot) { - col->insertDefault(); - return; - } - - switch (*meta.Slot) { - case EDataSlot::Utf8: - case EDataSlot::Json: - case EDataSlot::String: - return col->insert(NDB::Field(value.AsStringRef())); - case EDataSlot::Bool: - return col->insert(NDB::Field(value.Get<bool>())); - case EDataSlot::Float: - return col->insert(NDB::Field(value.Get<float>())); - case EDataSlot::Double: - return col->insert(NDB::Field(value.Get<double>())); - case EDataSlot::Uint8: - return col->insert(NDB::Field(value.Get<ui8>())); - case EDataSlot::Uint16: - case EDataSlot::Date: - case EDataSlot::TzDate: - return col->insert(NDB::Field(value.Get<ui16>())); - case EDataSlot::Uint32: - case EDataSlot::Datetime: - case EDataSlot::TzDatetime: - return col->insert(NDB::Field(value.Get<ui32>())); - case EDataSlot::Uint64: - case EDataSlot::Timestamp: - case EDataSlot::TzTimestamp: - return col->insert(NDB::Field(value.Get<ui64>())); - case EDataSlot::Int8: - return col->insert(NDB::Field(value.Get<i8>())); - case EDataSlot::Int16: - return col->insert(NDB::Field(value.Get<i16>())); - case EDataSlot::Int32: - return col->insert(NDB::Field(value.Get<i32>())); - case EDataSlot::Int64: - return col->insert(NDB::Field(value.Get<i64>())); - case EDataSlot::Uuid: { - NDB::UUID uuid; - PermuteUuid<true>(value.AsStringRef().Data(), reinterpret_cast<char*>(&uuid)); - return col->insert(NDB::Field(uuid)); - } - default: - UdfTerminate("TODO: Unsupported field type."); - } -} - -class TParseFromYdb : public TBoxedValue { -public: - class TStreamValue : public TBoxedValue { - public: - TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, - const std::vector<TColumnMeta>& outMeta, - const TSourcePosition& pos, - ui32 tzId) - : ValueBuilder(valueBuilder) - , Stream(stream) - , OutMeta(outMeta) - , Pos(pos) - , TzId(tzId) - , Cache(OutMeta.size()) - {} - - EFetchStatus Fetch(TUnboxedValue& result) final try { - for (;;) { - if (!BlockStream) { - if (const auto status = Stream.Fetch(Input); EFetchStatus::Ok != status) - return status; - - const std::string_view buffer = Input.AsStringRef(); - Buffer = std::make_unique<NDB::ReadBufferFromMemory>(buffer.data(), buffer.size()); - BlockStream = std::make_unique<NDB::NativeBlockInputStream>(*Buffer, DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD); - } - - if (CurrentRow >= CurrentBlock.rows()) { - CurrentRow = 0; - if (CurrentBlock = BlockStream->read(); !CurrentBlock) { - BlockStream.reset(); - Buffer.reset(); - continue; - } - } - - TUnboxedValue* items = nullptr; - result = Cache.NewArray(*ValueBuilder, items); - for (ui32 i = 0; i < OutMeta.size(); ++i) { - *items++ = ConvertOutputValue(CurrentBlock.getByPosition(i).column.get(), OutMeta[i], TzId, ValueBuilder, CurrentRow); - } - - ++CurrentRow; - return EFetchStatus::Ok; - } - } - catch (const Poco::Exception& e) { - UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data()); - } - catch (const std::exception& e) { - UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.what()).data()); - } - private: - const IValueBuilder* ValueBuilder; - const TUnboxedValue Stream; - const std::vector<TColumnMeta> OutMeta; // in struct order - const TSourcePosition Pos; - const ui32 TzId; - - TPlainArrayCache Cache; - - TUnboxedValue Input; - std::unique_ptr<NDB::ReadBuffer> Buffer; - std::unique_ptr<NDB::IBlockInputStream> BlockStream; - NDB::Block CurrentBlock; - size_t CurrentRow = 0; - }; - - TParseFromYdb(const TSourcePosition& pos, std::vector<TColumnMeta>&& metaForColumns) - : Pos(pos), OutMeta(std::move(metaForColumns)) - {} - - TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final try { - ui32 tzId = 0U; - if (const auto& tz = args[1U]) { - if (!valueBuilder->GetDateBuilder().FindTimezoneId(tz.AsStringRef(), tzId)) { - tzId = 0U; - } - } - - return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, OutMeta, Pos, tzId)); - } - catch (const Poco::Exception& e) { - UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data()); - } - catch (const std::exception& e) { - UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.what()).data()); - } -private: - const TSourcePosition Pos; - const std::vector<TColumnMeta> OutMeta; // in struct order -}; - -class TStreamValue : public TBoxedValue { -public: - TStreamValue(const std::string& type, const NDB::FormatSettings& settings, const IValueBuilder* valueBuilder, const TUnboxedValue& stream, - const std::vector<TColumnMeta> outMeta, const NDB::ColumnsWithTypeAndName& columns, ui32 tupleSize, const TSourcePosition& pos, ui32 tzId) - : ValueBuilder(valueBuilder) - , Stream(stream) - , OutMeta(outMeta) - , Columns(columns) - , TupleSize(tupleSize) - , Pos(pos) - , TzId(tzId) - , Cache(OutMeta.size()) - , TupleCache(tupleSize) - , Type(type) - , Settings(settings) - {} -private: - EFetchStatus Fetch(TUnboxedValue& result) final try { - for (;;) { - if (!BlockStream) { - if (const auto status = Stream.Fetch(Input); EFetchStatus::Ok != status) - return status; - - auto input = TupleSize ? Input.GetElement(0) : Input; - const std::string_view buffer = input.AsStringRef(); - Buffer = std::make_unique<NDB::ReadBufferFromMemory>(buffer.data(), buffer.size()); - BlockStream = std::make_unique<NDB::InputStreamFromInputFormat>(NDB::FormatFactory::instance().getInputFormat(Type, *Buffer, NDB::Block(Columns), nullptr, buffer.size(), Settings)); - } - - if (CurrentRow >= CurrentBlock.rows()) { - CurrentRow = 0; - if (CurrentBlock = BlockStream->read(); !CurrentBlock) { - BlockStream.reset(); - Buffer.reset(); - continue; - } - } - - TUnboxedValue* items = nullptr; - result = Cache.NewArray(*ValueBuilder, items); - for (ui32 i = 0; i < OutMeta.size(); ++i) { - *items++ = ConvertOutputValue(CurrentBlock.getByPosition(i).column.get(), OutMeta[i], TzId, ValueBuilder, CurrentRow); - } - - if (TupleSize) { - TUnboxedValue* tupleItems = nullptr; - auto tuple = TupleCache.NewArray(*ValueBuilder, tupleItems); - *tupleItems++ = result; - for (ui32 i = 1; i < TupleSize; ++i) { - *tupleItems++ = Input.GetElement(i); - } - result = tuple; - } - - ++CurrentRow; - return EFetchStatus::Ok; - } - } - catch (const Poco::Exception& e) { - UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data()); - } - catch (const std::exception& e) { - UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.what()).data()); - } - - const IValueBuilder* ValueBuilder; - const TUnboxedValue Stream; - const std::vector<TColumnMeta> OutMeta; - const NDB::ColumnsWithTypeAndName Columns; - const ui32 TupleSize; - const TSourcePosition Pos; - const ui32 TzId; - - TPlainArrayCache Cache; - TPlainArrayCache TupleCache; - - TUnboxedValue Input; - const TString Type; - const NDB::FormatSettings Settings; - - std::unique_ptr<NDB::ReadBuffer> Buffer; - std::unique_ptr<NDB::IBlockInputStream> BlockStream; - NDB::Block CurrentBlock; - size_t CurrentRow = 0U; -}; - -NDB::FormatSettings GetFormatSettings(const std::string_view& view) { - NDB::FormatSettings settings; - settings.skip_unknown_fields = true; - settings.with_names_use_header = true; - if (!view.empty()) { - const std::string str(view); - const JSON json(str); -#define SUPPORTED_FLAGS(xx) \ - xx(skip_unknown_fields) \ - xx(import_nested_json) \ - xx(with_names_use_header) \ - xx(null_as_default) \ - -#define SET_FLAG(flag) \ - if (json.has(#flag)) \ - settings.flag = json[#flag].get<bool>(); - - SUPPORTED_FLAGS(SET_FLAG) - -#undef SET_FLAG -#undef SUPPORTED_FLAGS - } - return settings; -} - -class TParseFormat : public TBoxedValue { -public: - TParseFormat(const std::string_view& type, const std::string_view& settings, - const TSourcePosition& pos, std::vector<TColumnMeta>&& outMeta, - NDB::ColumnsWithTypeAndName&& columns, ui32 tupleSize) - : Type(type) - , Settings(GetFormatSettings(settings)) - , Pos(pos) - , OutMeta(std::move(outMeta)) - , Columns(std::move(columns)) - , TupleSize(tupleSize) - {} - - TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final try { - ui32 tzId = 0U; - if (const auto& tz = args[1U]) { - if (!valueBuilder->GetDateBuilder().FindTimezoneId(tz.AsStringRef(), tzId)) { - tzId = 0U; - } - } - - return TUnboxedValuePod(new TStreamValue(Type, Settings, valueBuilder, *args, OutMeta, Columns, TupleSize, Pos, tzId)); - } - catch (const Poco::Exception& e) { - UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data()); - } - catch (const std::exception& e) { - UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.what()).data()); - } -private: - const std::string Type; - const NDB::FormatSettings Settings; - const TSourcePosition Pos; - const std::vector<TColumnMeta> OutMeta; - const NDB::ColumnsWithTypeAndName Columns; - const ui32 TupleSize; -}; - -class TParseBlocks : public TBoxedValue { - class TStreamValue : public TBoxedValue { - public: - TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::vector<TColumnMeta> outMeta, ui32 tupleSize, const TSourcePosition& pos, ui32 tzId) - : ValueBuilder(valueBuilder) - , Stream(stream) - , OutMeta(outMeta) - , TupleSize(tupleSize) - , Pos(pos) - , TzId(tzId) - , Cache(OutMeta.size()) - , TupleCache(tupleSize) - {} - private: - EFetchStatus Fetch(TUnboxedValue& result) final try { - while (true) { - if (!Input) { - if (const auto status = Stream.Fetch(Input); EFetchStatus::Ok != status) - return status; - } - - const auto block = static_cast<NDB::Block*>(TupleSize ? Input.GetElement(0).GetResource() : - Input.GetResource()); - - if (CurrentRow >= block->rows()) { - CurrentRow = 0; - Input = NUdf::TUnboxedValuePod(); - continue; - } - - TUnboxedValue* items = nullptr; - result = Cache.NewArray(*ValueBuilder, items); - for (ui32 i = 0; i < OutMeta.size(); ++i) { - *items++ = ConvertOutputValue(block->getByPosition(i).column.get(), OutMeta[i], TzId, ValueBuilder, CurrentRow); - } - - if (TupleSize) { - TUnboxedValue* tupleItems = nullptr; - auto tuple = TupleCache.NewArray(*ValueBuilder, tupleItems); - *tupleItems++ = result; - for (ui32 i = 1; i < TupleSize; ++i) { - *tupleItems++ = Input.GetElement(i); - } - result = tuple; - } - - ++CurrentRow; - return EFetchStatus::Ok; - } - } - catch (const Poco::Exception& e) { - UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data()); - } - catch (const std::exception& e) { - UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.what()).data()); - } - - const IValueBuilder* ValueBuilder; - const TUnboxedValue Stream; - const std::vector<TColumnMeta> OutMeta; - const ui32 TupleSize; - const TSourcePosition Pos; - const ui32 TzId; - - TPlainArrayCache Cache; - TPlainArrayCache TupleCache; - - TUnboxedValue Input; - size_t CurrentRow = 0U; - }; - -public: - TParseBlocks(const TSourcePosition& pos, std::vector<TColumnMeta>&& outMeta, ui32 tupleSize) - : Pos(pos), OutMeta(std::move(outMeta)), TupleSize(tupleSize) - {} - - TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final try { - ui32 tzId = 0U; - if (const auto& tz = args[1U]) { - if (!valueBuilder->GetDateBuilder().FindTimezoneId(tz.AsStringRef(), tzId)) { - tzId = 0U; - } - } - - return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, OutMeta, TupleSize, Pos, tzId)); - } - catch (const Poco::Exception& e) { - UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data()); - } - catch (const std::exception& e) { - UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.what()).data()); - } -private: - const TSourcePosition Pos; - const std::vector<TColumnMeta> OutMeta; - const ui32 TupleSize; -}; - -class TSerializeFormat : public TBoxedValue { - class TStreamValue : public TBoxedValue { - private: - using TPartitionByKey = std::vector<TUnboxedValue, TStdAllocatorForUdf<TUnboxedValue>>; - - struct TPartitionMapStuff { - explicit TPartitionMapStuff(size_t size) : Size(size) {} - - bool operator()(const TPartitionByKey& left, const TPartitionByKey& right) const { - for (auto i = 0U; i < Size; ++i) - if (left[i].AsStringRef() != right[i].AsStringRef()) - return false; - - return true; - } - - size_t operator()(const TPartitionByKey& value) const { - size_t hash = 0ULL; - for (auto i = 0U; i < Size; ++i) - hash = CombineHashes(hash, std::hash<std::string_view>()(value[i].AsStringRef())); - return hash; - } - - const size_t Size; - }; - - using TPartitionByPayload = std::pair<NDB::Block, std::unique_ptr<NDB::IBlockOutputStream>>; - using TPartitionsMap = std::unordered_map<TPartitionByKey, TPartitionByPayload, TPartitionMapStuff, TPartitionMapStuff, TStdAllocatorForUdf<std::pair<const TPartitionByKey, TPartitionByPayload>>>; - public: - TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::string& type, const NDB::FormatSettings& settings, const std::vector<ui32>& keysIndexes, const std::vector<ui32>& payloadsIndexes, size_t blockSizeLimit, size_t keysCountLimit, size_t totalSizeLimit, const std::vector<TColumnMeta>& inMeta, const NDB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos) - : ValueBuilder(valueBuilder) - , Stream(stream) - , InMeta(inMeta) - , KeysIndexes(keysIndexes) - , PayloadsIndexes(payloadsIndexes) - , BlockSizeLimit(blockSizeLimit) - , KeysCountLimit(keysCountLimit) - , TotalSizeLimit(totalSizeLimit) - , Pos(pos) - , HeaderBlock(columns) - , Buffer(std::make_unique<NDB::WriteBufferFromOwnString>()) - , Settings(settings) - , Type(type) - , PartitionsMap(0ULL, TPartitionMapStuff(KeysIndexes.size()), TPartitionMapStuff(KeysIndexes.size())) - , Cache(KeysIndexes.size() + 1U) - {} - private: - void FlushKey(const TPartitionsMap::iterator it, TUnboxedValue& result) { - TotalSize -= it->second.first.bytes(); - if (!it->second.second) { - it->second.second = std::make_unique<NDB::OutputStreamToOutputFormat>(NDB::FormatFactory::instance().getOutputFormat(Type, *Buffer, HeaderBlock, nullptr, {}, Settings)); - it->second.second->writePrefix(); - } - it->second.second->write(it->second.first); - it->second.second->writeSuffix(); - it->second.second->flush(); - it->second.second.reset(); - it->second.first = HeaderBlock.cloneEmpty(); - - if (KeysIndexes.empty()) - result = ValueBuilder->NewString(Buffer->str()); - else { - TUnboxedValue* tupleItems = nullptr; - result = Cache.NewArray(*ValueBuilder, tupleItems); - *tupleItems++ = ValueBuilder->NewString(Buffer->str()); - std::copy(it->first.cbegin(), it->first.cend(), tupleItems); - } - - Buffer->restart(); - } - - EFetchStatus Fetch(TUnboxedValue& result) final try { - if (IsFinished && PartitionsMap.empty()) - return EFetchStatus::Finish; - - for (TUnboxedValue row; !IsFinished;) { - switch (const auto status = Stream.Fetch(row)) { - case EFetchStatus::Yield: - return EFetchStatus::Yield; - case EFetchStatus::Ok: { - if (TotalSizeLimit && TotalSizeLimit < TotalSize) { - const auto top = std::max_element(PartitionsMap.begin(), PartitionsMap.end(), - [](const TPartitionsMap::value_type& l, const TPartitionsMap::value_type& r){ return l.second.first.bytes(), r.second.first.bytes(); }); - FlushKey(top, result); - } - - TPartitionByKey keys(KeysIndexes.size()); - for (auto i = 0U; i < keys.size(); ++i) - keys[i] = row.GetElement(KeysIndexes[i]); - - const auto ins = PartitionsMap.emplace(keys, std::make_pair(HeaderBlock.cloneEmpty(), nullptr)); - - if (ins.second && PartitionsMap.size() > KeysCountLimit) { - UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " Too many unique keys: " << PartitionsMap.size()).data()); - } - - const bool flush = !ins.second && ins.first->second.first.bytes() >= BlockSizeLimit; - if (flush) - FlushKey(ins.first, result); - - TotalSize -= ins.first->second.first.bytes(); - auto columns = ins.first->second.first.mutateColumns(); - for (auto i = 0U; i < columns.size(); ++i) { - const auto index = PayloadsIndexes[i]; - ConvertInputValue(row.GetElement(index), columns[i].get(), InMeta[index]); - } - ins.first->second.first.setColumns(std::move(columns)); - TotalSize += ins.first->second.first.bytes(); - - if (flush) - return EFetchStatus::Ok; - else - continue; - } - case EFetchStatus::Finish: - IsFinished = true; - break; - } - } - - if (PartitionsMap.empty()) - return EFetchStatus::Finish; - - FlushKey(PartitionsMap.begin(), result); - PartitionsMap.erase(PartitionsMap.cbegin()); - return EFetchStatus::Ok; - } - catch (const Poco::Exception& e) { - UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data()); - } - catch (const std::exception& e) { - UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.what()).data()); - } - - const IValueBuilder* ValueBuilder; - const TUnboxedValue Stream; - const std::vector<TColumnMeta> InMeta; - const std::vector<ui32> KeysIndexes; - const std::vector<ui32> PayloadsIndexes; - const size_t BlockSizeLimit; - const size_t KeysCountLimit; - const size_t TotalSizeLimit; - const TSourcePosition Pos; - - const NDB::Block HeaderBlock; - const std::unique_ptr<NDB::WriteBufferFromOwnString> Buffer; - const NDB::FormatSettings Settings; - const std::string Type; - - TPartitionsMap PartitionsMap; - - TPlainArrayCache Cache; - - bool IsFinished = false; - size_t TotalSize = 0ULL; - }; -public: - TSerializeFormat(const std::string_view& type, const std::string_view& settings, std::vector<ui32>&& keysIndexes, std::vector<ui32>&& payloadsIndexes, size_t blockSizeLimit, size_t keysCountLimit, size_t totalSizeLimit, const TSourcePosition& pos, std::vector<TColumnMeta>&& inMeta, NDB::ColumnsWithTypeAndName&& columns) - : Type(type), Settings(GetFormatSettings(settings)), KeysIndexes(std::move(keysIndexes)), PayloadsIndexes(std::move(payloadsIndexes)), BlockSizeLimit(blockSizeLimit), KeysCountLimit(keysCountLimit), TotalSizeLimit(totalSizeLimit), Pos(pos), InMeta(std::move(inMeta)), Columns(std::move(columns)) - {} - - TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final try { - return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, Type, Settings, KeysIndexes, PayloadsIndexes, BlockSizeLimit, KeysCountLimit, TotalSizeLimit, InMeta, Columns, Pos)); - } - catch (const Poco::Exception& e) { - UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data()); - } - catch (const std::exception& e) { - UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.what()).data()); - } -private: - const std::string Type; - const NDB::FormatSettings Settings; - const std::vector<ui32> KeysIndexes; - const std::vector<ui32> PayloadsIndexes; - const size_t BlockSizeLimit; - const size_t KeysCountLimit; - const size_t TotalSizeLimit; - const TSourcePosition Pos; - - const std::vector<TColumnMeta> InMeta; - const NDB::ColumnsWithTypeAndName Columns; -}; - -struct TCHInitializer { - using TWeakPtr = std::weak_ptr<TCHInitializer>; - using TPtr = std::shared_ptr<TCHInitializer>; - - TCHInitializer() - { - NDB::registerFormats(); - } -}; - -template <typename T> -TString MakeEnumImpl(const T& values) { - TStringBuilder str; - str << "Enum<"; - bool first = true; - for (const auto& value : values) { - if (!first) { - str << ','; - } - else { - first = false; - } - - str << "'" << value.first << "'"; - } - - str << '>'; - return str; -} - -std::optional<TString> MakeYqlType(NDB::DataTypePtr type, bool validTz) { - if (type->getTypeId() == NDB::TypeIndex::Enum8) { - const NDB::DataTypeEnum8* enum8 = NDB::checkAndGetDataType<NDB::DataTypeEnum8>(type.get()); - return MakeEnumImpl(enum8->getValues()); - } - - if (type->getTypeId() == NDB::TypeIndex::Enum16) { - const NDB::DataTypeEnum16* enum16 = NDB::checkAndGetDataType<NDB::DataTypeEnum16>(type.get()); - return MakeEnumImpl(enum16->getValues()); - } - - if (type->getTypeId() == NDB::TypeIndex::AggregateFunction) { - return "Tagged<String,'" + TString(type->getName()) + "'>"; - } - - if (type->getTypeId() == NDB::TypeIndex::Array) { - const NDB::DataTypeArray* array = NDB::checkAndGetDataType<NDB::DataTypeArray>(type.get()); - type = array->getNestedType(); - if (type->getTypeId() == NDB::TypeIndex::Nothing) { - return "EmptyList"; - } - - auto inner = MakeYqlType(type, validTz); - if (!inner) { - return std::nullopt; - } - - return "List<" + *inner + '>'; - } - - if (type->getTypeId() == NDB::TypeIndex::Tuple) { - const NDB::DataTypeTuple* tuple = NDB::checkAndGetDataType<NDB::DataTypeTuple>(type.get()); - const auto& elems = tuple->getElements(); - TStringBuilder str; - str << "Tuple<"; - bool first = true; - for (const auto& e : elems) { - auto inner = MakeYqlType(e, validTz); - if (!inner) { - return std::nullopt; - } - - if (!first) { - str << ','; - } else { - first = false; - } - - str << *inner; - } - - str << '>'; - return str; - } - - if (type->isNullable()) { - type = removeNullable(type); - if (type->getTypeId() == NDB::TypeIndex::Nothing) { - return "Null"; - } - - auto inner = MakeYqlType(type, validTz); - if (!inner) { - return std::nullopt; - } - - return "Optional<" + *inner + '>'; - } - - if (type->getTypeId() == NDB::TypeIndex::UInt8) return "Uint8"; - else if (type->getTypeId() == NDB::TypeIndex::Int8) return "Int8"; - else if (type->getTypeId() == NDB::TypeIndex::UInt16) return "Uint16"; - else if (type->getTypeId() == NDB::TypeIndex::Int16) return "Int16"; - else if (type->getTypeId() == NDB::TypeIndex::UInt32) return "Uint32"; - else if (type->getTypeId() == NDB::TypeIndex::Int32) return "Int32"; - else if (type->getTypeId() == NDB::TypeIndex::UInt64) return "Uint64"; - else if (type->getTypeId() == NDB::TypeIndex::Int64) return "Int64"; - else if (type->getTypeId() == NDB::TypeIndex::Float32) return "Float"; - else if (type->getTypeId() == NDB::TypeIndex::Float64) return "Double"; - else if (type->getTypeId() == NDB::TypeIndex::String) return "String"; - else if (type->getTypeId() == NDB::TypeIndex::FixedString) return "String"; - else if (validTz && type->getTypeId() == NDB::TypeIndex::Date) return "TzDate"; - else if (validTz && type->getTypeId() == NDB::TypeIndex::DateTime) return "TzDatetime"; - else if (type->getTypeId() == NDB::TypeIndex::UUID) return "Uuid"; - else return std::nullopt; -} - -SIMPLE_UDF(TToYqlType, TOptional<TUtf8>(TUtf8, TUtf8)) { - const auto ref = args[0].AsStringRef(); - const auto tzRef = args[1].AsStringRef(); - ui32 tzId; - const bool validTz = valueBuilder->GetDateBuilder().FindTimezoneId(tzRef, tzId); - const NDB::String typeStr(ref.Data(), ref.Data() + ref.Size()); - const auto type = NDB::DataTypeFactory::instance().get(typeStr); - if (const auto ret = MakeYqlType(type, validTz)) { - return valueBuilder->NewString(*ret); - } - return TUnboxedValue(); -} - -} - -class TClickHouseClientModule : public IUdfModule -{ -public: - TClickHouseClientModule() = default; - - static const TStringRef& Name() { - static constexpr auto name = TStringRef::Of("ClickHouseClient"); - return name; - } - - void CleanupOnTerminate() const final {} - - void GetAllFunctions(IFunctionsSink& sink) const final { - sink.Add(TStringRef::Of("ToYqlType")); - sink.Add(TStringRef::Of("ParseFormat"))->SetTypeAwareness(); - sink.Add(TStringRef::Of("ParseBlocks"))->SetTypeAwareness(); - sink.Add(TStringRef::Of("ParseFromYdb"))->SetTypeAwareness(); - sink.Add(TStringRef::Of("SerializeFormat"))->SetTypeAwareness(); - } - - void BuildFunctionTypeInfo( - const TStringRef& name, - TType* userType, - const TStringRef& typeConfig, - ui32 flags, - IFunctionTypeInfoBuilder& builder) const final try { - LazyInitContext(); - auto argBuilder = builder.Args(); - if (name == "ToYqlType") { - argBuilder->Add<TUtf8>(); - argBuilder->Add<TUtf8>(); - argBuilder->Done(); - builder.Returns<TOptional<TUtf8>>(); - if (!(flags & TFlags::TypesOnly)) { - builder.Implementation(new TToYqlType(builder)); - } - - return; - } - - const auto typeHelper = builder.TypeInfoHelper(); - if (name == "ParseBlocks") { - const auto userTypeInspector = TTupleTypeInspector(*typeHelper, userType); - if (!userTypeInspector || userTypeInspector.GetElementsCount() < 3) { - return builder.SetError("Invalid user type."); - } - - const auto argsTypeTuple = userTypeInspector.GetElementType(0); - const auto argsTypeInspector = TTupleTypeInspector(*typeHelper, argsTypeTuple); - if (!argsTypeInspector) { - return builder.SetError("Invalid user type - expected tuple."); - } - - if (const auto argsCount = argsTypeInspector.GetElementsCount(); argsCount < 1 || argsCount > 2) { - ::TStringBuilder sb; - sb << "Invalid user type - expected one or two arguments, got: " << argsCount; - return builder.SetError(sb); - } - - ui32 tupleSize = 0; - TType* inputItemType = builder.Resource("ClickHouseClient.Block"); - - const auto resultType = userTypeInspector.GetElementType(2); - auto structType = TStructTypeInspector(*typeHelper, resultType); - if (!structType) { - if (const auto tupleType = TTupleTypeInspector(*typeHelper, resultType)) { - tupleSize = tupleType.GetElementsCount(); - if (tupleSize > 0) { - structType = TStructTypeInspector(*typeHelper, tupleType.GetElementType(0)); - auto tupleBuilder = builder.Tuple(tupleSize); - tupleBuilder->Add(inputItemType); - for (ui32 i = 1; i < tupleSize; ++i) { - tupleBuilder->Add(tupleType.GetElementType(i)); - } - inputItemType = tupleBuilder->Build(); - } - } - } - - builder.UserType(userType); - builder.Args()->Add(builder.Stream()->Item(inputItemType)).Add<TOptional<TUtf8>>().Done(); - builder.OptionalArgs(1U); - builder.Returns(builder.Stream()->Item(resultType)); - - if (structType) { - std::vector<TColumnMeta> outMeta(structType.GetMembersCount()); - for (ui32 i = 0U; i < structType.GetMembersCount(); ++i) { - if (auto& meta = outMeta[i]; !GetDataType(builder, *typeHelper, structType.GetMemberType(i), meta)) { - ::TStringBuilder sb; - sb << "Incompatible column '" << structType.GetMemberName(i) << "' type: "; - TTypePrinter(*typeHelper, structType.GetMemberType(i)).Out(sb.Out); - return builder.SetError(sb); - } - } - - if (!(flags & TFlags::TypesOnly)) { - builder.Implementation(new TParseBlocks(builder.GetSourcePosition(), std::move(outMeta), tupleSize)); - } - return; - } else { - ::TStringBuilder sb; - sb << "Incompatible row type: "; - TTypePrinter(*typeHelper, resultType).Out(sb.Out); - return builder.SetError(sb); - } - } else if (name == "ParseFormat") { - const auto userTypeInspector = TTupleTypeInspector(*typeHelper, userType); - if (!userTypeInspector || userTypeInspector.GetElementsCount() < 3) { - return builder.SetError("Invalid user type."); - } - - const auto argsTypeTuple = userTypeInspector.GetElementType(0); - const auto argsTypeInspector = TTupleTypeInspector(*typeHelper, argsTypeTuple); - if (!argsTypeInspector) { - return builder.SetError("Invalid user type - expected tuple."); - } - - if (const auto argsCount = argsTypeInspector.GetElementsCount(); argsCount < 1 || argsCount > 2) { - ::TStringBuilder sb; - sb << "Invalid user type - expected one or two arguments, got: " << argsCount; - return builder.SetError(sb); - } - - ui32 tupleSize = 0; - TType* inputItemType = builder.Primitive(GetDataTypeInfo(EDataSlot::String).TypeId); - - const auto resultType = userTypeInspector.GetElementType(2); - auto structType = TStructTypeInspector(*typeHelper, resultType); - if (!structType) { - if (const auto tupleType = TTupleTypeInspector(*typeHelper, resultType)) { - tupleSize = tupleType.GetElementsCount(); - if (tupleSize > 0) { - structType = TStructTypeInspector(*typeHelper, tupleType.GetElementType(0)); - auto tupleBuilder = builder.Tuple(tupleSize); - tupleBuilder->Add(inputItemType); - for (ui32 i = 1; i < tupleSize; ++i) { - tupleBuilder->Add(tupleType.GetElementType(i)); - } - inputItemType = tupleBuilder->Build(); - } - } - } - - builder.UserType(userType); - builder.Args()->Add(builder.Stream()->Item(inputItemType)).Add<TOptional<TUtf8>>().Done(); - builder.OptionalArgs(1U); - builder.Returns(builder.Stream()->Item(resultType)); - - if (structType) { - std::vector<TColumnMeta> outMeta(structType.GetMembersCount()); - NDB::ColumnsWithTypeAndName columns(structType.GetMembersCount()); - for (ui32 i = 0U; i < structType.GetMembersCount(); ++i) { - if (auto& meta = outMeta[i]; GetDataType(builder, *typeHelper, structType.GetMemberType(i), meta)) { - auto& colsumn = columns[i]; - colsumn.type = MetaToClickHouse(meta); - colsumn.name = structType.GetMemberName(i); - } else { - ::TStringBuilder sb; - sb << "Incompatible column '" << structType.GetMemberName(i) << "' type: "; - TTypePrinter(*typeHelper, structType.GetMemberType(i)).Out(sb.Out); - return builder.SetError(sb); - } - } - - if (!(flags & TFlags::TypesOnly)) { - const std::string_view& typeCfg = typeConfig; - const auto jsonFrom = typeCfg.find('{'); - builder.Implementation(new TParseFormat(typeCfg.substr(0U, jsonFrom), - std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom), - builder.GetSourcePosition(), - std::move(outMeta), - std::move(columns), - tupleSize)); - } - return; - } else { - ::TStringBuilder sb; - sb << "Incompatible row type: "; - TTypePrinter(*typeHelper, resultType).Out(sb.Out); - return builder.SetError(sb); - } - } else if (name == "SerializeFormat") { - const auto userTypeInspector = TTupleTypeInspector(*typeHelper, userType); - if (!userTypeInspector || userTypeInspector.GetElementsCount() < 1) { - return builder.SetError("Invalid user type."); - } - - const auto argsTypeTuple = userTypeInspector.GetElementType(0); - const auto argsTypeInspector = TTupleTypeInspector(*typeHelper, argsTypeTuple); - if (!argsTypeInspector) { - return builder.SetError("Invalid user type - expected tuple."); - } - - if (const auto argsCount = argsTypeInspector.GetElementsCount(); argsCount != 1) { - ::TStringBuilder sb; - sb << "Invalid user type - expected one or two arguments, got: " << argsCount; - return builder.SetError(sb); - } - - const auto inputType = argsTypeInspector.GetElementType(0); - - const std::string_view& typeCfg = typeConfig; - const auto jsonFrom = typeCfg.find('{'); - std::vector<std::string> keys; - const auto& tail = std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom); - size_t blockSizeLimit = 256_MB; - size_t keysCountLimit = 4096; - size_t totalSizeLimit = 1_GB; - if (!tail.empty()) { - const std::string str(tail); - const JSON json(str); - if (json.has("keys")) { - const auto& list = json["keys"]; - keys.reserve(list.size()); - std::transform(list.begin(), list.end(), std::back_inserter(keys), std::mem_fun_ref(&JSON::getString)); - } - if (json.has("block_size_limit")) { - blockSizeLimit = json["block_size_limit"].getUInt(); - } - if (json.has("keys_count_limit")) { - keysCountLimit = json["keys_count_limit"].getUInt(); - } - if (json.has("total_size_limit")) { - totalSizeLimit = json["total_size_limit"].getUInt(); - } - } - - builder.UserType(userType); - builder.Args()->Add(inputType).Done(); - if (keys.empty()) - builder.Returns(builder.Stream()->Item<const char*>()); - else { - const auto tuple = builder.Tuple(); - tuple->Add<const char*>(); - for (auto k =0U; k < keys.size(); ++k) - tuple->Add<TUtf8>(); - builder.Returns(builder.Stream()->Item(tuple->Build())); - } - - if (const auto structType = TStructTypeInspector(*typeHelper, TStreamTypeInspector(*typeHelper, inputType).GetItemType())) { - std::vector<TColumnMeta> inMeta(structType.GetMembersCount()); - std::vector<ui32> keyIndexes(keys.size()), payloadIndexes(structType.GetMembersCount() - keys.size()); - NDB::ColumnsWithTypeAndName columns; - columns.reserve(payloadIndexes.size()); - for (ui32 i = 0U; i < structType.GetMembersCount(); ++i) { - if (auto& meta = inMeta[i]; GetDataType(builder, *typeHelper, structType.GetMemberType(i), meta)) { - const std::string_view name = structType.GetMemberName(i); - bool payload = true; - for (ui32 k = 0U; k < keys.size(); ++k) { - if (keys[k] == name) { - keyIndexes[k] = i; - payload = false; - } - } - - if (payload) { - payloadIndexes[columns.size()] = i; - columns.emplace_back(MetaToClickHouse(meta), std::string(name)); - } - } else { - ::TStringBuilder sb; - sb << "Incompatible column '" << structType.GetMemberName(i) << "' type: "; - TTypePrinter(*typeHelper, structType.GetMemberType(i)).Out(sb.Out); - return builder.SetError(sb); - } - } - - if (!(flags & TFlags::TypesOnly)) { - builder.Implementation(new TSerializeFormat(typeCfg.substr(0U, jsonFrom), tail, std::move(keyIndexes), std::move(payloadIndexes), blockSizeLimit, keysCountLimit, totalSizeLimit, builder.GetSourcePosition(), std::move(inMeta), std::move(columns))); - } - return; - } else { - ::TStringBuilder sb; - sb << "Incompatible row type: "; - TTypePrinter(*typeHelper, inputType).Out(sb.Out); - return builder.SetError(sb); - } - } else if (name == "ParseFromYdb") { - builder.UserType(userType); - builder.Args()->Add(builder.Stream()->Item<char*>()).Add<TOptional<TUtf8>>().Done(); - builder.OptionalArgs(1U); - builder.Returns(builder.Stream()->Item(userType)); - - if (const auto structType = TStructTypeInspector(*typeHelper, userType)) { - std::vector<TColumnMeta> columns(structType.GetMembersCount()); - for (ui32 i = 0U; i < structType.GetMembersCount(); ++i) { - if (const auto dataType = TDataAndDecimalTypeInspector(*typeHelper, TOptionalTypeInspector(*typeHelper, structType.GetMemberType(i)).GetItemType())) { - auto& meta = columns[i]; - meta.Slot = GetDataSlot(dataType.GetTypeId()); - meta.IsOptional = true; - meta.Precision = dataType.GetPrecision(); - meta.Scale = dataType.GetScale(); - } else { - ::TStringBuilder sb; - sb << "Incompatible column '" << structType.GetMemberName(i) << "' type: "; - TTypePrinter(*typeHelper, structType.GetMemberType(i)).Out(sb.Out); - return builder.SetError(sb); - } - } - - if (!(flags & TFlags::TypesOnly)) { - builder.Implementation(new TParseFromYdb(builder.GetSourcePosition(), std::move(columns))); - } - return; - } else { - ::TStringBuilder sb; - sb << "Incompatible row type: "; - TTypePrinter(*typeHelper, userType).Out(sb.Out); - return builder.SetError(sb); - } - } - } - catch (const Poco::Exception& e) { - builder.SetError(e.displayText()); - } - catch (const std::exception& e) { - builder.SetError(TStringBuf(e.what())); - } -private: - void LazyInitContext() const { - const std::unique_lock lock(CtxMutex); - if (!Ctx) { - if (auto ctx = StaticCtx.lock()) { - Ctx = std::move(ctx); - } else { - StaticCtx = Ctx = std::make_shared<TCHInitializer>(); - } - } - } - - static std::mutex CtxMutex; - static TCHInitializer::TWeakPtr StaticCtx; - mutable TCHInitializer::TPtr Ctx; -}; - -std::mutex TClickHouseClientModule::CtxMutex; -TCHInitializer::TWeakPtr TClickHouseClientModule::StaticCtx; - -REGISTER_MODULES(TClickHouseClientModule); +#include <ydb/library/yql/public/udf/udf_helpers.h>
+#include <ydb/library/yql/public/udf/udf_type_printer.h>
+#include <ydb/library/yql/utils/utf8.h>
+#include <ydb/library/yql/minikql/dom/json.h>
+
+#include <Poco/Util/Application.h>
+
+#include "base/common/JSON.h"
+
+#include "src/Columns/ColumnAggregateFunction.h"
+#include "src/Columns/ColumnArray.h"
+#include "src/Columns/ColumnConst.h"
+#include "src/Columns/ColumnNullable.h"
+#include "src/Columns/ColumnTuple.h"
+#include "src/Columns/ColumnsNumber.h"
+#include "src/Columns/ColumnString.h"
+
+#include "src/DataStreams/NativeBlockInputStream.h"
+
+#include "src/DataTypes/DataTypeEnum.h"
+#include "src/DataTypes/DataTypesNumber.h"
+#include "src/DataTypes/DataTypeDate.h"
+#include "src/DataTypes/DataTypeFactory.h"
+#include "src/DataTypes/DataTypeArray.h"
+#include "src/DataTypes/DataTypeNothing.h"
+#include "src/DataTypes/DataTypeTuple.h"
+#include "src/DataTypes/DataTypeNullable.h"
+#include "src/DataTypes/DataTypeString.h"
+#include "src/DataTypes/DataTypeUUID.h"
+
+#include "src/Core/Block.h"
+#include "src/Core/ColumnsWithTypeAndName.h"
+
+#include "src/Formats/FormatFactory.h"
+#include "src/Formats/registerFormats.h"
+#include "src/Processors/Formats/InputStreamFromInputFormat.h"
+#include "src/Processors/Formats/OutputStreamToOutputFormat.h"
+
+#include <util/generic/size_literals.h>
+#include <util/generic/yexception.h>
+#include <util/string/split.h>
+
+using namespace NYql;
+using namespace NUdf;
+
+namespace {
+struct TPgConvertInfo {
+ ui32 TypeId = 0;
+ TStringRef TypeName;
+ i32 TypeLen = 0;
+ const TType* SourceDataType = nullptr;
+ std::optional<EDataSlot> SourceLogicalSlot; // empty means that there is not corresponding type for pg type, e.g. point
+};
+
+struct TColumnMeta {
+ std::optional<TPgConvertInfo> PgConvertInfo;
+ std::optional<TString> Aggregation;
+ std::optional<EDataSlot> Slot; // empty if null type
+ bool IsOptional = false;
+ bool IsEmptyList = false;
+ bool IsList = false;
+ bool IsTuple = false;
+ ui8 Precision = 0, Scale = 0; // filled for Decimal
+ std::vector<TColumnMeta> Items; // filled if IsTuple or IsList
+ std::vector<TString> Enum;
+ std::unordered_map<i16, std::pair<TString, ui32>> EnumVar;
+};
+
+std::pair<std::optional<EDataSlot>, TType*> InferSlotFromPgType(IFunctionTypeInfoBuilder& typeInfoBuilder, const TStringRef typeName) {
+ if (typeName == TStringRef("bool")) {
+ return { EDataSlot::Bool, typeInfoBuilder.SimpleType<bool>() };
+ }
+ if (typeName == TStringRef("int4")) {
+ return { EDataSlot::Int32, typeInfoBuilder.SimpleType<i32>() };
+ }
+ if (typeName == TStringRef("int8")) {
+ return { EDataSlot::Int64, typeInfoBuilder.SimpleType<i64>() };
+ }
+ if (typeName == TStringRef("float4")) {
+ return { EDataSlot::Float, typeInfoBuilder.SimpleType<float>() };
+ }
+ if (typeName == TStringRef("float8")) {
+ return { EDataSlot::Double, typeInfoBuilder.SimpleType<double>() };
+ }
+ if (typeName == TStringRef("text")) {
+ return { EDataSlot::Utf8, typeInfoBuilder.SimpleType<TUtf8>() };
+ }
+ if (IsIn({ TStringRef("cstring"), TStringRef("varchar"), TStringRef("bytea") }, typeName)) {
+ return { EDataSlot::String, nullptr };
+ }
+ return {};
+}
+
+TPgConvertInfo GetPgConvertInfo(IFunctionTypeInfoBuilder& typeInfoBuilder, const ITypeInfoHelper& typeHelper, const TType* type) {
+ TPgConvertInfo info;
+ info.TypeId = TPgTypeInspector(typeHelper, type).GetTypeId();
+ auto* pgDescription = typeHelper.FindPgTypeDescription(info.TypeId);
+ Y_ENSURE(pgDescription);
+ info.TypeLen = pgDescription->Typelen;
+ info.TypeName = pgDescription->Name;
+
+ std::tie(info.SourceLogicalSlot, info.SourceDataType) = InferSlotFromPgType(typeInfoBuilder, pgDescription->Name);
+ if (info.SourceDataType) {
+ info.SourceDataType = typeInfoBuilder.Optional()->Item(info.SourceDataType).Build();
+ }
+ return info;
+}
+
+template<bool MaybeOptional = true>
+bool GetDataType(IFunctionTypeInfoBuilder& typeInfoBuilder, const ITypeInfoHelper& typeHelper, const TType* type, TColumnMeta& meta) {
+ switch (typeHelper.GetTypeKind(type)) {
+ case ETypeKind::Tuple: {
+ meta.IsTuple = true;
+ const TTupleTypeInspector tupleType(typeHelper, type);
+ meta.Items.resize(tupleType.GetElementsCount());
+ for (auto i = 0U; i < meta.Items.size(); ++i)
+ if (!GetDataType(typeInfoBuilder, typeHelper, tupleType.GetElementType(i), meta.Items[i]))
+ return false;
+ return true;
+ }
+ case ETypeKind::List:
+ meta.IsList = true;
+ type = TListTypeInspector(typeHelper, type).GetItemType();
+ meta.Items.resize(1U);
+ return GetDataType(typeInfoBuilder, typeHelper, type, meta.Items.front());
+ case ETypeKind::Optional:
+ if constexpr (MaybeOptional) {
+ meta.IsOptional = true;
+ type = TOptionalTypeInspector(typeHelper, type).GetItemType();
+ return GetDataType<false>(typeInfoBuilder, typeHelper, type, meta);
+ }
+ else
+ break;
+ case ETypeKind::Data: {
+ const TDataAndDecimalTypeInspector dataType(typeHelper, type);
+ meta.Slot = GetDataSlot(dataType.GetTypeId());
+ meta.Precision = dataType.GetPrecision();
+ meta.Scale = dataType.GetScale();
+ return true;
+ }
+ case ETypeKind::Pg: {
+ meta.PgConvertInfo = GetPgConvertInfo(typeInfoBuilder, typeHelper, type);
+ return true;
+ }
+ default:
+ break;
+ }
+ return false;
+}
+
+NDB::DataTypePtr PgMetaToClickHouse(const TPgConvertInfo& info) {
+ if (info.TypeName == TStringRef("bool")) {
+ return std::make_shared<NDB::DataTypeUInt8>();
+ }
+
+ if (info.TypeName == TStringRef("int4")) {
+ return std::make_shared<NDB::DataTypeInt32>();
+ }
+
+ if (info.TypeName == TStringRef("int8")) {
+ return std::make_shared<NDB::DataTypeInt64>();
+ }
+
+ if (info.TypeName == TStringRef("float4")) {
+ return std::make_shared<NDB::DataTypeFloat32>();
+ }
+
+ if (info.TypeName == TStringRef("float8")) {
+ return std::make_shared<NDB::DataTypeFloat64>();
+ }
+ return std::make_shared<NDB::DataTypeString>();
+}
+
+NDB::DataTypePtr PgMetaToNullableClickHouse(const TPgConvertInfo& info) {
+ return makeNullable(PgMetaToClickHouse(info));
+}
+
+NDB::DataTypePtr MetaToClickHouse(const TColumnMeta& meta) {
+ if (meta.PgConvertInfo) {
+ return PgMetaToNullableClickHouse(*meta.PgConvertInfo);
+ }
+
+ if (!meta.Enum.empty()) {
+ if (meta.Enum.size() < 256) {
+ NDB::DataTypeEnum8::Values values;
+ if (!meta.EnumVar.size()) {
+ for (ui32 i = 0; i < meta.Enum.size(); ++i) {
+ std::string name(meta.Enum[i]);
+ values.emplace_back(std::make_pair(name, i8(Min<i8>() + i)));
+ }
+ }
+ else {
+ for (const auto& x : meta.EnumVar) {
+ std::string name(x.second.first);
+ values.emplace_back(std::make_pair(name, i8(x.first)));
+ }
+ }
+
+ return std::make_shared<NDB::DataTypeEnum8>(values);
+ }
+ else {
+ NDB::DataTypeEnum16::Values values;
+ if (!meta.EnumVar.size()) {
+ for (ui32 i = 0; i < meta.Enum.size(); ++i) {
+ std::string name(meta.Enum[i]);
+ values.emplace_back(std::make_pair(name, i16(Min<i16>() + i)));
+ }
+ }
+ else {
+ for (const auto& x : meta.EnumVar) {
+ std::string name(x.second.first);
+ values.emplace_back(std::make_pair(name, x.first));
+ }
+ }
+
+ return std::make_shared<NDB::DataTypeEnum16>(values);
+ }
+ }
+
+ if (meta.Aggregation) {
+ return NDB::DataTypeFactory::instance().get(*meta.Aggregation);
+ }
+
+ if (meta.IsEmptyList) {
+ return std::make_shared<NDB::DataTypeArray>(std::make_shared<NDB::DataTypeNothing>());
+ }
+
+ if (meta.IsList) {
+ return std::make_shared<NDB::DataTypeArray>(MetaToClickHouse(meta.Items.front()));
+ }
+
+ if (meta.IsTuple) {
+ NDB::DataTypes elems;
+ for (const auto& e : meta.Items) {
+ elems.push_back(MetaToClickHouse(e));
+ }
+
+ return std::make_shared<NDB::DataTypeTuple>(elems);
+ }
+
+ NDB::DataTypePtr ret;
+ if (!meta.Slot) {
+ ret = makeNullable(std::make_shared<NDB::DataTypeNothing>());
+ }
+ else {
+ switch (*meta.Slot) {
+ case EDataSlot::Int8:
+ ret = std::make_shared<NDB::DataTypeInt8>();
+ break;
+ case EDataSlot::Bool:
+ case EDataSlot::Uint8:
+ ret = std::make_shared<NDB::DataTypeUInt8>();
+ break;
+ case EDataSlot::Int16:
+ ret = std::make_shared<NDB::DataTypeInt16>();
+ break;
+ case EDataSlot::Uint16:
+ ret = std::make_shared<NDB::DataTypeUInt16>();
+ break;
+ case EDataSlot::Int32:
+ ret = std::make_shared<NDB::DataTypeInt32>();
+ break;
+ case EDataSlot::Uint32:
+ ret = std::make_shared<NDB::DataTypeUInt32>();
+ break;
+ case EDataSlot::Int64:
+ ret = std::make_shared<NDB::DataTypeInt64>();
+ break;
+ case EDataSlot::Uint64:
+ ret = std::make_shared<NDB::DataTypeUInt64>();
+ break;
+ case EDataSlot::Float:
+ ret = std::make_shared<NDB::DataTypeFloat32>();
+ break;
+ case EDataSlot::Double:
+ ret = std::make_shared<NDB::DataTypeFloat64>();
+ break;
+ case EDataSlot::String:
+ case EDataSlot::Utf8:
+ case EDataSlot::Json:
+ ret = std::make_shared<NDB::DataTypeString>();
+ break;
+ case EDataSlot::Date:
+ case EDataSlot::TzDate:
+ ret = std::make_shared<NDB::DataTypeDate>();
+ break;
+ case EDataSlot::Datetime:
+ case EDataSlot::TzDatetime:
+ ret = std::make_shared<NDB::DataTypeDateTime>();
+ break;
+ case EDataSlot::Uuid:
+ ret = std::make_shared<NDB::DataTypeUUID>();
+ break;
+ default:
+ ythrow yexception() << "Unsupported argument type: " << GetDataTypeInfo(*meta.Slot).Name;
+ }
+ }
+
+ ret = meta.IsOptional ? makeNullable(ret) : ret;
+ return ret;
+}
+
+template<bool Forward>
+void PermuteUuid(const char* src, char* dst) {
+ static constexpr ui32 Pairs[16] = { 4, 5, 6, 7, 2, 3, 0, 1, 15, 14, 13, 12, 11, 10, 9, 8 };
+ static constexpr ui32 InvPairs[16] = { 6, 7, 4, 5, 0, 1, 2, 3, 15, 14, 13, 12, 11, 10, 9, 8 };
+ for (ui32 i = 0; i < 16; ++i) {
+ dst[Forward ? Pairs[i] : InvPairs[i]] = src[i];
+ }
+}
+
+TUnboxedValuePod ConvertOutputValueForPgType(const NDB::IColumn* col, const TPgConvertInfo& meta,
+ const IValueBuilder* valueBuilder, ssize_t externalIndex)
+{
+ Y_ENSURE(col->getDataType() == NDB::TypeIndex::Nullable);
+ NDB::StringRef ref;
+ if (col->isNullAt(externalIndex)) {
+ return {};
+ }
+
+ auto& pgBuilder = valueBuilder->GetPgBuilder();
+ ref = static_cast<const NDB::ColumnNullable*>(col)->getNestedColumn().getDataAt(externalIndex);
+ if (!meta.SourceLogicalSlot) {
+ // just parse pg value
+ TStringValue parseError("");
+ if (ref.size > std::numeric_limits<ui32>::max()) {
+ ythrow yexception() << "Too huge payload to parse pg value: " << ref.size;
+ }
+ auto ret = pgBuilder.ValueFromText(meta.TypeId, { ref.data, static_cast<ui32>(ref.size) }, parseError);
+ if (!ret) {
+ ythrow yexception() << "Failed to parse value of pg type " << meta.TypeName << ", details: " << TStringBuf(parseError.Data(), parseError.Size());
+ }
+ }
+
+ switch (*meta.SourceLogicalSlot) {
+ case EDataSlot::Bool:
+ [[fallthrough]];
+ case EDataSlot::Int32:
+ [[fallthrough]];
+ case EDataSlot::Int64:
+ [[fallthrough]];
+ case EDataSlot::Float:
+ [[fallthrough]];
+ case EDataSlot::Double: {
+ TUnboxedValuePod ret = TUnboxedValuePod::Zero();
+ Y_ENSURE(ref.size <= 8);
+ memcpy(&ret, ref.data, ref.size);
+ return pgBuilder.ConvertToPg(ret, meta.SourceDataType, meta.TypeId).Release();
+ }
+ case EDataSlot::Utf8:
+ if (!IsUtf8(std::string_view(ref))) {
+ ythrow yexception() << "Bad Utf8.";
+ }
+ [[fallthrough]];
+ case EDataSlot::String:
+ if (ref.size > std::numeric_limits<ui32>::max()) {
+ ythrow yexception() << "Too huge payload to build pg string: " << ref.size;
+ }
+
+ return pgBuilder.NewString(meta.TypeLen, meta.TypeId, { ref.data, static_cast<ui32>(ref.size) }).Release();
+ default:
+ ythrow yexception() << "Unexpected slot " << *meta.SourceLogicalSlot << " for pg type " << meta.TypeName;
+ }
+}
+
+TUnboxedValuePod ConvertOutputValue(const NDB::IColumn* col, const TColumnMeta& meta, ui32 tzId,
+ const IValueBuilder* valueBuilder, ssize_t externalIndex = 0) {
+ if (meta.PgConvertInfo) {
+ return ConvertOutputValueForPgType(col, *meta.PgConvertInfo, valueBuilder, externalIndex);
+ }
+
+ if (!meta.Enum.empty()) {
+ auto ref = col->getDataAt(externalIndex);
+ i16 value;
+ if (col->getDataType() == NDB::TypeIndex::Int8) {
+ value = *(const i8*)ref.data;
+ }
+ else if (col->getDataType() == NDB::TypeIndex::Int16) {
+ value = *(const i16*)ref.data;
+ }
+ else {
+ ythrow yexception() << "Unsupported column type: " << col->getName();
+ }
+
+ Y_ENSURE(meta.EnumVar.size() == meta.Enum.size());
+ const auto x = meta.EnumVar.find(value);
+ if (x == meta.EnumVar.cend()) {
+ ythrow yexception() << "Missing enum value: " << value;
+ }
+
+ const ui32 index = x->second.second;
+ return valueBuilder->NewVariant(index, TUnboxedValue::Void()).Release();
+ }
+
+ if (meta.Aggregation) {
+ auto field = (*col)[externalIndex];
+ const auto& state = field.get<NDB::AggregateFunctionStateData &>();
+ if (state.data.size() > std::numeric_limits<ui32>::max()) {
+ ythrow yexception() << "Too huge aggregation state to put it into string: " << state.data.size();
+ }
+
+ return valueBuilder->NewString({ state.data.data(), static_cast<ui32>(state.data.size()) }).Release();
+ }
+
+ if (meta.IsEmptyList) {
+ return valueBuilder->NewEmptyList().Release();
+ }
+
+ const NDB::IColumn* data = col;
+ if (meta.IsList) {
+ const NDB::ColumnArray& res = static_cast<const NDB::ColumnArray&>(*col);
+ const NDB::IColumn::Offsets& offsets = res.getOffsets();
+ data = &res.getData();
+ ui64 start = offsets[externalIndex - 1];
+ ui64 limit = offsets[externalIndex] - start;
+ TUnboxedValue* items = nullptr;
+ TUnboxedValue ret = valueBuilder->NewArray(limit, items);
+ for (ui64 index = start; index < start + limit; ++index) {
+ TUnboxedValue* current = items + index - start;
+ *current = ConvertOutputValue(data, meta.Items.front(), tzId, valueBuilder, index);
+ }
+
+ return ret.Release();
+ }
+
+ if (meta.IsTuple) {
+ auto count = meta.Items.size();
+ TUnboxedValue* items = nullptr;
+ TUnboxedValue ret = valueBuilder->NewArray(count, items);
+ const NDB::ColumnTuple& res = static_cast<const NDB::ColumnTuple&>(*col);
+ for (ui32 i = 0; i < count; ++i) {
+ items[i] = ConvertOutputValue(&res.getColumn(i), meta.Items[i], tzId, valueBuilder, externalIndex);
+ }
+
+ return ret.Release();
+ }
+
+ if (!meta.Slot) {
+ return {};
+ }
+
+ NDB::StringRef ref;
+ if (meta.IsOptional) {
+ if (data->isNullAt(externalIndex)) {
+ return {};
+ }
+
+ ref = static_cast<const NDB::ColumnNullable*>(data)->getNestedColumn().getDataAt(externalIndex);
+ }
+ else {
+ ref = data->getDataAt(externalIndex);
+ }
+
+ if (const auto slot = *meta.Slot; slot == EDataSlot::String) {
+ if (ref.size > std::numeric_limits<ui32>::max()) {
+ ythrow yexception() << "Too huge payload to build string: " << ref.size;
+ }
+
+ return valueBuilder->NewString({ ref.data, static_cast<ui32>(ref.size) }).Release();
+ }
+ else if (slot == EDataSlot::Utf8) {
+ if (!IsUtf8(std::string_view(ref))) {
+ ythrow yexception() << "Bad Utf8.";
+ }
+ if (ref.size > std::numeric_limits<ui32>::max()) {
+ ythrow yexception() << "Too huge payload to build Utf8 string: " << ref.size;
+ }
+
+ return valueBuilder->NewString({ ref.data, static_cast<ui32>(ref.size) }).Release();
+ }
+ else if (slot == EDataSlot::Json) {
+ if (!NDom::IsValidJson(std::string_view(ref))) {
+ ythrow yexception() << "Bad Json.";
+ }
+ if (ref.size > std::numeric_limits<ui32>::max()) {
+ ythrow yexception() << "Too huge payload to build Json string: " << ref.size;
+ }
+
+ return valueBuilder->NewString({ ref.data, static_cast<ui32>(ref.size) }).Release();
+ }
+ else if (slot == EDataSlot::Uuid) {
+ char uuid[16];
+ if (ref.size != sizeof(uuid)) {
+ ythrow yexception() << "Unsupported Uuid of size " << ref.size;
+ }
+ PermuteUuid<false>(ref.data, uuid);
+ return valueBuilder->NewString({ uuid, sizeof(uuid) }).Release();
+ }
+ else if (slot == EDataSlot::Decimal) {
+ if (NDecimal::TInt128 decimal; ref.size == sizeof(decimal)) {
+ std::memcpy(&decimal, ref.data, ref.size);
+ return TUnboxedValuePod(decimal);
+ }
+ else
+ ythrow yexception() << "Unsupported decimal of size " << ref.size;
+ }
+ else {
+ auto size = GetDataTypeInfo(*meta.Slot).FixedSize;
+ TUnboxedValuePod ret = TUnboxedValuePod::Zero();
+ Y_ENSURE(ref.size <= 8);
+ memcpy(&ret, ref.data, size);
+ if (tzId) {
+ if (*meta.Slot == EDataSlot::TzDatetime) {
+ ret.SetTimezoneId(tzId);
+ }
+ else if (*meta.Slot == EDataSlot::TzDate) {
+ auto& builder = valueBuilder->GetDateBuilder();
+ ui32 year, month, day;
+ if (const auto date = ret.Get<ui16>(); !builder.SplitDate(date, year, month, day)) {
+ ythrow yexception() << "Error in SplitDate(" << date << ").";
+ }
+
+ ui32 datetime;
+ if (!builder.MakeDatetime(year, month, day, 23u, 59u, 0u, datetime, tzId)) {
+ ythrow yexception() << "Error in MakeDatetime(" << year << ',' << month << ',' << day << ',' << tzId << ").";
+ }
+
+ ret = TUnboxedValuePod(ui16(datetime / 86400u));
+ ret.SetTimezoneId(tzId);
+ }
+ }
+
+ return ret;
+ }
+}
+
+void ConvertInputValue(const TUnboxedValuePod& value, NDB::IColumn* col, const TColumnMeta& meta) {
+ if (meta.IsOptional && !value)
+ return col->insert(NDB::Field());
+
+ if (meta.IsEmptyList) {
+ col->insertDefault();
+ return;
+ }
+
+ if (meta.IsList) {
+ auto& res = static_cast<NDB::ColumnArray&>(*col);
+ auto& offsets = res.getOffsets();
+ const auto data = &res.getData();
+ ui64 count = 0ULL;
+ if (const auto elements = value.GetElements()) {
+ for (const auto length = value.GetListLength(); count < length; ++count)
+ ConvertInputValue(elements[count], data, meta.Items.front());
+ } else {
+ const auto iterator = value.GetListIterator();
+ for (TUnboxedValue current; iterator.Next(current); ++count) {
+ ConvertInputValue(current, data, meta.Items.front());
+ }
+ }
+
+ const auto prev = offsets.back();
+ offsets.emplace_back(prev + count);
+ return;
+ }
+
+ if (meta.IsTuple) {
+ auto& res = static_cast<NDB::ColumnTuple&>(*col);
+ for (ui32 i = 0U; i < meta.Items.size(); ++i) {
+ const auto current = value.GetElement(i);
+ ConvertInputValue(current, &res.getColumn(i), meta.Items[i]);
+ }
+
+ return;
+ }
+
+ if (!meta.Slot) {
+ col->insertDefault();
+ return;
+ }
+
+ switch (*meta.Slot) {
+ case EDataSlot::Utf8:
+ case EDataSlot::Json:
+ case EDataSlot::String:
+ return col->insert(NDB::Field(value.AsStringRef()));
+ case EDataSlot::Bool:
+ return col->insert(NDB::Field(value.Get<bool>()));
+ case EDataSlot::Float:
+ return col->insert(NDB::Field(value.Get<float>()));
+ case EDataSlot::Double:
+ return col->insert(NDB::Field(value.Get<double>()));
+ case EDataSlot::Uint8:
+ return col->insert(NDB::Field(value.Get<ui8>()));
+ case EDataSlot::Uint16:
+ case EDataSlot::Date:
+ case EDataSlot::TzDate:
+ return col->insert(NDB::Field(value.Get<ui16>()));
+ case EDataSlot::Uint32:
+ case EDataSlot::Datetime:
+ case EDataSlot::TzDatetime:
+ return col->insert(NDB::Field(value.Get<ui32>()));
+ case EDataSlot::Uint64:
+ case EDataSlot::Timestamp:
+ case EDataSlot::TzTimestamp:
+ return col->insert(NDB::Field(value.Get<ui64>()));
+ case EDataSlot::Int8:
+ return col->insert(NDB::Field(value.Get<i8>()));
+ case EDataSlot::Int16:
+ return col->insert(NDB::Field(value.Get<i16>()));
+ case EDataSlot::Int32:
+ return col->insert(NDB::Field(value.Get<i32>()));
+ case EDataSlot::Int64:
+ return col->insert(NDB::Field(value.Get<i64>()));
+ case EDataSlot::Uuid: {
+ NDB::UUID uuid;
+ PermuteUuid<true>(value.AsStringRef().Data(), reinterpret_cast<char*>(&uuid));
+ return col->insert(NDB::Field(uuid));
+ }
+ default:
+ UdfTerminate("TODO: Unsupported field type.");
+ }
+}
+
+class TParseFromYdb : public TBoxedValue {
+public:
+ class TStreamValue : public TBoxedValue {
+ public:
+ TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream,
+ const std::vector<TColumnMeta>& outMeta,
+ const TSourcePosition& pos,
+ ui32 tzId)
+ : ValueBuilder(valueBuilder)
+ , Stream(stream)
+ , OutMeta(outMeta)
+ , Pos(pos)
+ , TzId(tzId)
+ , Cache(OutMeta.size())
+ {}
+
+ EFetchStatus Fetch(TUnboxedValue& result) final try {
+ for (;;) {
+ if (!BlockStream) {
+ if (const auto status = Stream.Fetch(Input); EFetchStatus::Ok != status)
+ return status;
+
+ const std::string_view buffer = Input.AsStringRef();
+ Buffer = std::make_unique<NDB::ReadBufferFromMemory>(buffer.data(), buffer.size());
+ BlockStream = std::make_unique<NDB::NativeBlockInputStream>(*Buffer, DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD);
+ }
+
+ if (CurrentRow >= CurrentBlock.rows()) {
+ CurrentRow = 0;
+ if (CurrentBlock = BlockStream->read(); !CurrentBlock) {
+ BlockStream.reset();
+ Buffer.reset();
+ continue;
+ }
+ }
+
+ TUnboxedValue* items = nullptr;
+ result = Cache.NewArray(*ValueBuilder, items);
+ for (ui32 i = 0; i < OutMeta.size(); ++i) {
+ *items++ = ConvertOutputValue(CurrentBlock.getByPosition(i).column.get(), OutMeta[i], TzId, ValueBuilder, CurrentRow);
+ }
+
+ ++CurrentRow;
+ return EFetchStatus::Ok;
+ }
+ }
+ catch (const Poco::Exception& e) {
+ UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
+ }
+ catch (const std::exception& e) {
+ UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.what()).data());
+ }
+ private:
+ const IValueBuilder* ValueBuilder;
+ const TUnboxedValue Stream;
+ const std::vector<TColumnMeta> OutMeta; // in struct order
+ const TSourcePosition Pos;
+ const ui32 TzId;
+
+ TPlainArrayCache Cache;
+
+ TUnboxedValue Input;
+ std::unique_ptr<NDB::ReadBuffer> Buffer;
+ std::unique_ptr<NDB::IBlockInputStream> BlockStream;
+ NDB::Block CurrentBlock;
+ size_t CurrentRow = 0;
+ };
+
+ TParseFromYdb(const TSourcePosition& pos, std::vector<TColumnMeta>&& metaForColumns)
+ : Pos(pos), OutMeta(std::move(metaForColumns))
+ {}
+
+ TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final try {
+ ui32 tzId = 0U;
+ if (const auto& tz = args[1U]) {
+ if (!valueBuilder->GetDateBuilder().FindTimezoneId(tz.AsStringRef(), tzId)) {
+ tzId = 0U;
+ }
+ }
+
+ return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, OutMeta, Pos, tzId));
+ }
+ catch (const Poco::Exception& e) {
+ UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
+ }
+ catch (const std::exception& e) {
+ UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.what()).data());
+ }
+private:
+ const TSourcePosition Pos;
+ const std::vector<TColumnMeta> OutMeta; // in struct order
+};
+
+class TStreamValue : public TBoxedValue {
+public:
+ TStreamValue(const std::string& type, const NDB::FormatSettings& settings, const IValueBuilder* valueBuilder, const TUnboxedValue& stream,
+ const std::vector<TColumnMeta> outMeta, const NDB::ColumnsWithTypeAndName& columns, ui32 tupleSize, const TSourcePosition& pos, ui32 tzId)
+ : ValueBuilder(valueBuilder)
+ , Stream(stream)
+ , OutMeta(outMeta)
+ , Columns(columns)
+ , TupleSize(tupleSize)
+ , Pos(pos)
+ , TzId(tzId)
+ , Cache(OutMeta.size())
+ , TupleCache(tupleSize)
+ , Type(type)
+ , Settings(settings)
+ {}
+private:
+ EFetchStatus Fetch(TUnboxedValue& result) final try {
+ for (;;) {
+ if (!BlockStream) {
+ if (const auto status = Stream.Fetch(Input); EFetchStatus::Ok != status)
+ return status;
+
+ InputElement = TupleSize ? Input.GetElement(0) : Input;
+ const std::string_view buffer = InputElement.AsStringRef();
+ Buffer = std::make_unique<NDB::ReadBufferFromMemory>(buffer.data(), buffer.size());
+ BlockStream = std::make_unique<NDB::InputStreamFromInputFormat>(NDB::FormatFactory::instance().getInputFormat(Type, *Buffer, NDB::Block(Columns), nullptr, buffer.size(), Settings));
+ }
+
+ if (CurrentRow >= CurrentBlock.rows()) {
+ CurrentRow = 0;
+ if (CurrentBlock = BlockStream->read(); !CurrentBlock) {
+ BlockStream.reset();
+ Buffer.reset();
+ continue;
+ }
+ }
+
+ TUnboxedValue* items = nullptr;
+ result = Cache.NewArray(*ValueBuilder, items);
+ for (ui32 i = 0; i < OutMeta.size(); ++i) {
+ *items++ = ConvertOutputValue(CurrentBlock.getByPosition(i).column.get(), OutMeta[i], TzId, ValueBuilder, CurrentRow);
+ }
+
+ if (TupleSize) {
+ TUnboxedValue* tupleItems = nullptr;
+ auto tuple = TupleCache.NewArray(*ValueBuilder, tupleItems);
+ *tupleItems++ = result;
+ for (ui32 i = 1; i < TupleSize; ++i) {
+ *tupleItems++ = Input.GetElement(i);
+ }
+ result = tuple;
+ }
+
+ ++CurrentRow;
+ return EFetchStatus::Ok;
+ }
+ }
+ catch (const Poco::Exception& e) {
+ UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
+ }
+ catch (const std::exception& e) {
+ UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.what()).data());
+ }
+
+ const IValueBuilder* ValueBuilder;
+ const TUnboxedValue Stream;
+ const std::vector<TColumnMeta> OutMeta;
+ const NDB::ColumnsWithTypeAndName Columns;
+ const ui32 TupleSize;
+ const TSourcePosition Pos;
+ const ui32 TzId;
+
+ TPlainArrayCache Cache;
+ TPlainArrayCache TupleCache;
+
+ TUnboxedValue Input;
+ TUnboxedValue InputElement;
+ const TString Type;
+ const NDB::FormatSettings Settings;
+
+ std::unique_ptr<NDB::ReadBuffer> Buffer;
+ std::unique_ptr<NDB::IBlockInputStream> BlockStream;
+ NDB::Block CurrentBlock;
+ size_t CurrentRow = 0U;
+};
+
+NDB::FormatSettings GetFormatSettings(const std::string_view& view) {
+ NDB::FormatSettings settings;
+ settings.skip_unknown_fields = true;
+ settings.with_names_use_header = true;
+ if (!view.empty()) {
+ const std::string str(view);
+ const JSON json(str);
+#define SUPPORTED_FLAGS(xx) \
+ xx(skip_unknown_fields) \
+ xx(import_nested_json) \
+ xx(with_names_use_header) \
+ xx(null_as_default) \
+
+#define SET_FLAG(flag) \
+ if (json.has(#flag)) \
+ settings.flag = json[#flag].get<bool>();
+
+ SUPPORTED_FLAGS(SET_FLAG)
+
+#undef SET_FLAG
+#undef SUPPORTED_FLAGS
+ }
+ return settings;
+}
+
+class TParseFormat : public TBoxedValue {
+public:
+ TParseFormat(const std::string_view& type, const std::string_view& settings,
+ const TSourcePosition& pos, std::vector<TColumnMeta>&& outMeta,
+ NDB::ColumnsWithTypeAndName&& columns, ui32 tupleSize)
+ : Type(type)
+ , Settings(GetFormatSettings(settings))
+ , Pos(pos)
+ , OutMeta(std::move(outMeta))
+ , Columns(std::move(columns))
+ , TupleSize(tupleSize)
+ {}
+
+ TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final try {
+ ui32 tzId = 0U;
+ if (const auto& tz = args[1U]) {
+ if (!valueBuilder->GetDateBuilder().FindTimezoneId(tz.AsStringRef(), tzId)) {
+ tzId = 0U;
+ }
+ }
+
+ return TUnboxedValuePod(new TStreamValue(Type, Settings, valueBuilder, *args, OutMeta, Columns, TupleSize, Pos, tzId));
+ }
+ catch (const Poco::Exception& e) {
+ UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
+ }
+ catch (const std::exception& e) {
+ UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.what()).data());
+ }
+private:
+ const std::string Type;
+ const NDB::FormatSettings Settings;
+ const TSourcePosition Pos;
+ const std::vector<TColumnMeta> OutMeta;
+ const NDB::ColumnsWithTypeAndName Columns;
+ const ui32 TupleSize;
+};
+
+class TParseBlocks : public TBoxedValue {
+ class TStreamValue : public TBoxedValue {
+ public:
+ TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::vector<TColumnMeta> outMeta, ui32 tupleSize, const TSourcePosition& pos, ui32 tzId)
+ : ValueBuilder(valueBuilder)
+ , Stream(stream)
+ , OutMeta(outMeta)
+ , TupleSize(tupleSize)
+ , Pos(pos)
+ , TzId(tzId)
+ , Cache(OutMeta.size())
+ , TupleCache(tupleSize)
+ {}
+ private:
+ EFetchStatus Fetch(TUnboxedValue& result) final try {
+ while (true) {
+ if (!Input) {
+ if (const auto status = Stream.Fetch(Input); EFetchStatus::Ok != status)
+ return status;
+ }
+
+ const auto block = static_cast<NDB::Block*>(TupleSize ? Input.GetElement(0).GetResource() :
+ Input.GetResource());
+
+ if (CurrentRow >= block->rows()) {
+ CurrentRow = 0;
+ Input = NUdf::TUnboxedValuePod();
+ continue;
+ }
+
+ TUnboxedValue* items = nullptr;
+ result = Cache.NewArray(*ValueBuilder, items);
+ for (ui32 i = 0; i < OutMeta.size(); ++i) {
+ *items++ = ConvertOutputValue(block->getByPosition(i).column.get(), OutMeta[i], TzId, ValueBuilder, CurrentRow);
+ }
+
+ if (TupleSize) {
+ TUnboxedValue* tupleItems = nullptr;
+ auto tuple = TupleCache.NewArray(*ValueBuilder, tupleItems);
+ *tupleItems++ = result;
+ for (ui32 i = 1; i < TupleSize; ++i) {
+ *tupleItems++ = Input.GetElement(i);
+ }
+ result = tuple;
+ }
+
+ ++CurrentRow;
+ return EFetchStatus::Ok;
+ }
+ }
+ catch (const Poco::Exception& e) {
+ UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
+ }
+ catch (const std::exception& e) {
+ UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.what()).data());
+ }
+
+ const IValueBuilder* ValueBuilder;
+ const TUnboxedValue Stream;
+ const std::vector<TColumnMeta> OutMeta;
+ const ui32 TupleSize;
+ const TSourcePosition Pos;
+ const ui32 TzId;
+
+ TPlainArrayCache Cache;
+ TPlainArrayCache TupleCache;
+
+ TUnboxedValue Input;
+ size_t CurrentRow = 0U;
+ };
+
+public:
+ TParseBlocks(const TSourcePosition& pos, std::vector<TColumnMeta>&& outMeta, ui32 tupleSize)
+ : Pos(pos), OutMeta(std::move(outMeta)), TupleSize(tupleSize)
+ {}
+
+ TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final try {
+ ui32 tzId = 0U;
+ if (const auto& tz = args[1U]) {
+ if (!valueBuilder->GetDateBuilder().FindTimezoneId(tz.AsStringRef(), tzId)) {
+ tzId = 0U;
+ }
+ }
+
+ return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, OutMeta, TupleSize, Pos, tzId));
+ }
+ catch (const Poco::Exception& e) {
+ UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
+ }
+ catch (const std::exception& e) {
+ UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.what()).data());
+ }
+private:
+ const TSourcePosition Pos;
+ const std::vector<TColumnMeta> OutMeta;
+ const ui32 TupleSize;
+};
+
+class TSerializeFormat : public TBoxedValue {
+ class TStreamValue : public TBoxedValue {
+ private:
+ using TPartitionByKey = std::vector<TUnboxedValue, TStdAllocatorForUdf<TUnboxedValue>>;
+
+ struct TPartitionMapStuff {
+ explicit TPartitionMapStuff(size_t size) : Size(size) {}
+
+ bool operator()(const TPartitionByKey& left, const TPartitionByKey& right) const {
+ for (auto i = 0U; i < Size; ++i)
+ if (left[i].AsStringRef() != right[i].AsStringRef())
+ return false;
+
+ return true;
+ }
+
+ size_t operator()(const TPartitionByKey& value) const {
+ size_t hash = 0ULL;
+ for (auto i = 0U; i < Size; ++i)
+ hash = CombineHashes(hash, std::hash<std::string_view>()(value[i].AsStringRef()));
+ return hash;
+ }
+
+ const size_t Size;
+ };
+
+ using TPartitionByPayload = std::pair<NDB::Block, std::unique_ptr<NDB::IBlockOutputStream>>;
+ using TPartitionsMap = std::unordered_map<TPartitionByKey, TPartitionByPayload, TPartitionMapStuff, TPartitionMapStuff, TStdAllocatorForUdf<std::pair<const TPartitionByKey, TPartitionByPayload>>>;
+ public:
+ TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::string& type, const NDB::FormatSettings& settings, const std::vector<ui32>& keysIndexes, const std::vector<ui32>& payloadsIndexes, size_t blockSizeLimit, size_t keysCountLimit, size_t totalSizeLimit, const std::vector<TColumnMeta>& inMeta, const NDB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos)
+ : ValueBuilder(valueBuilder)
+ , Stream(stream)
+ , InMeta(inMeta)
+ , KeysIndexes(keysIndexes)
+ , PayloadsIndexes(payloadsIndexes)
+ , BlockSizeLimit(blockSizeLimit)
+ , KeysCountLimit(keysCountLimit)
+ , TotalSizeLimit(totalSizeLimit)
+ , Pos(pos)
+ , HeaderBlock(columns)
+ , Buffer(std::make_unique<NDB::WriteBufferFromOwnString>())
+ , Settings(settings)
+ , Type(type)
+ , PartitionsMap(0ULL, TPartitionMapStuff(KeysIndexes.size()), TPartitionMapStuff(KeysIndexes.size()))
+ , Cache(KeysIndexes.size() + 1U)
+ {}
+ private:
+ void FlushKey(const TPartitionsMap::iterator it, TUnboxedValue& result) {
+ TotalSize -= it->second.first.bytes();
+ if (!it->second.second) {
+ it->second.second = std::make_unique<NDB::OutputStreamToOutputFormat>(NDB::FormatFactory::instance().getOutputFormat(Type, *Buffer, HeaderBlock, nullptr, {}, Settings));
+ it->second.second->writePrefix();
+ }
+ it->second.second->write(it->second.first);
+ it->second.second->writeSuffix();
+ it->second.second->flush();
+ it->second.second.reset();
+ it->second.first = HeaderBlock.cloneEmpty();
+
+ if (KeysIndexes.empty())
+ result = ValueBuilder->NewString(Buffer->str());
+ else {
+ TUnboxedValue* tupleItems = nullptr;
+ result = Cache.NewArray(*ValueBuilder, tupleItems);
+ *tupleItems++ = ValueBuilder->NewString(Buffer->str());
+ std::copy(it->first.cbegin(), it->first.cend(), tupleItems);
+ }
+
+ Buffer->restart();
+ }
+
+ EFetchStatus Fetch(TUnboxedValue& result) final try {
+ if (IsFinished && PartitionsMap.empty())
+ return EFetchStatus::Finish;
+
+ for (TUnboxedValue row; !IsFinished;) {
+ switch (const auto status = Stream.Fetch(row)) {
+ case EFetchStatus::Yield:
+ return EFetchStatus::Yield;
+ case EFetchStatus::Ok: {
+ if (TotalSizeLimit && TotalSizeLimit < TotalSize) {
+ const auto top = std::max_element(PartitionsMap.begin(), PartitionsMap.end(),
+ [](const TPartitionsMap::value_type& l, const TPartitionsMap::value_type& r){ return l.second.first.bytes(), r.second.first.bytes(); });
+ FlushKey(top, result);
+ }
+
+ TPartitionByKey keys(KeysIndexes.size());
+ for (auto i = 0U; i < keys.size(); ++i)
+ keys[i] = row.GetElement(KeysIndexes[i]);
+
+ const auto ins = PartitionsMap.emplace(keys, std::make_pair(HeaderBlock.cloneEmpty(), nullptr));
+
+ if (ins.second && PartitionsMap.size() > KeysCountLimit) {
+ UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " Too many unique keys: " << PartitionsMap.size()).data());
+ }
+
+ const bool flush = !ins.second && ins.first->second.first.bytes() >= BlockSizeLimit;
+ if (flush)
+ FlushKey(ins.first, result);
+
+ TotalSize -= ins.first->second.first.bytes();
+ auto columns = ins.first->second.first.mutateColumns();
+ for (auto i = 0U; i < columns.size(); ++i) {
+ const auto index = PayloadsIndexes[i];
+ ConvertInputValue(row.GetElement(index), columns[i].get(), InMeta[index]);
+ }
+ ins.first->second.first.setColumns(std::move(columns));
+ TotalSize += ins.first->second.first.bytes();
+
+ if (flush)
+ return EFetchStatus::Ok;
+ else
+ continue;
+ }
+ case EFetchStatus::Finish:
+ IsFinished = true;
+ break;
+ }
+ }
+
+ if (PartitionsMap.empty())
+ return EFetchStatus::Finish;
+
+ FlushKey(PartitionsMap.begin(), result);
+ PartitionsMap.erase(PartitionsMap.cbegin());
+ return EFetchStatus::Ok;
+ }
+ catch (const Poco::Exception& e) {
+ UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
+ }
+ catch (const std::exception& e) {
+ UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.what()).data());
+ }
+
+ const IValueBuilder* ValueBuilder;
+ const TUnboxedValue Stream;
+ const std::vector<TColumnMeta> InMeta;
+ const std::vector<ui32> KeysIndexes;
+ const std::vector<ui32> PayloadsIndexes;
+ const size_t BlockSizeLimit;
+ const size_t KeysCountLimit;
+ const size_t TotalSizeLimit;
+ const TSourcePosition Pos;
+
+ const NDB::Block HeaderBlock;
+ const std::unique_ptr<NDB::WriteBufferFromOwnString> Buffer;
+ const NDB::FormatSettings Settings;
+ const std::string Type;
+
+ TPartitionsMap PartitionsMap;
+
+ TPlainArrayCache Cache;
+
+ bool IsFinished = false;
+ size_t TotalSize = 0ULL;
+ };
+public:
+ TSerializeFormat(const std::string_view& type, const std::string_view& settings, std::vector<ui32>&& keysIndexes, std::vector<ui32>&& payloadsIndexes, size_t blockSizeLimit, size_t keysCountLimit, size_t totalSizeLimit, const TSourcePosition& pos, std::vector<TColumnMeta>&& inMeta, NDB::ColumnsWithTypeAndName&& columns)
+ : Type(type), Settings(GetFormatSettings(settings)), KeysIndexes(std::move(keysIndexes)), PayloadsIndexes(std::move(payloadsIndexes)), BlockSizeLimit(blockSizeLimit), KeysCountLimit(keysCountLimit), TotalSizeLimit(totalSizeLimit), Pos(pos), InMeta(std::move(inMeta)), Columns(std::move(columns))
+ {}
+
+ TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final try {
+ return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, Type, Settings, KeysIndexes, PayloadsIndexes, BlockSizeLimit, KeysCountLimit, TotalSizeLimit, InMeta, Columns, Pos));
+ }
+ catch (const Poco::Exception& e) {
+ UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
+ }
+ catch (const std::exception& e) {
+ UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.what()).data());
+ }
+private:
+ const std::string Type;
+ const NDB::FormatSettings Settings;
+ const std::vector<ui32> KeysIndexes;
+ const std::vector<ui32> PayloadsIndexes;
+ const size_t BlockSizeLimit;
+ const size_t KeysCountLimit;
+ const size_t TotalSizeLimit;
+ const TSourcePosition Pos;
+
+ const std::vector<TColumnMeta> InMeta;
+ const NDB::ColumnsWithTypeAndName Columns;
+};
+
+struct TCHInitializer {
+ using TWeakPtr = std::weak_ptr<TCHInitializer>;
+ using TPtr = std::shared_ptr<TCHInitializer>;
+
+ TCHInitializer()
+ {
+ NDB::registerFormats();
+ }
+};
+
+template <typename T>
+TString MakeEnumImpl(const T& values) {
+ TStringBuilder str;
+ str << "Enum<";
+ bool first = true;
+ for (const auto& value : values) {
+ if (!first) {
+ str << ',';
+ }
+ else {
+ first = false;
+ }
+
+ str << "'" << value.first << "'";
+ }
+
+ str << '>';
+ return str;
+}
+
+std::optional<TString> MakeYqlType(NDB::DataTypePtr type, bool validTz) {
+ if (type->getTypeId() == NDB::TypeIndex::Enum8) {
+ const NDB::DataTypeEnum8* enum8 = NDB::checkAndGetDataType<NDB::DataTypeEnum8>(type.get());
+ return MakeEnumImpl(enum8->getValues());
+ }
+
+ if (type->getTypeId() == NDB::TypeIndex::Enum16) {
+ const NDB::DataTypeEnum16* enum16 = NDB::checkAndGetDataType<NDB::DataTypeEnum16>(type.get());
+ return MakeEnumImpl(enum16->getValues());
+ }
+
+ if (type->getTypeId() == NDB::TypeIndex::AggregateFunction) {
+ return "Tagged<String,'" + TString(type->getName()) + "'>";
+ }
+
+ if (type->getTypeId() == NDB::TypeIndex::Array) {
+ const NDB::DataTypeArray* array = NDB::checkAndGetDataType<NDB::DataTypeArray>(type.get());
+ type = array->getNestedType();
+ if (type->getTypeId() == NDB::TypeIndex::Nothing) {
+ return "EmptyList";
+ }
+
+ auto inner = MakeYqlType(type, validTz);
+ if (!inner) {
+ return std::nullopt;
+ }
+
+ return "List<" + *inner + '>';
+ }
+
+ if (type->getTypeId() == NDB::TypeIndex::Tuple) {
+ const NDB::DataTypeTuple* tuple = NDB::checkAndGetDataType<NDB::DataTypeTuple>(type.get());
+ const auto& elems = tuple->getElements();
+ TStringBuilder str;
+ str << "Tuple<";
+ bool first = true;
+ for (const auto& e : elems) {
+ auto inner = MakeYqlType(e, validTz);
+ if (!inner) {
+ return std::nullopt;
+ }
+
+ if (!first) {
+ str << ',';
+ } else {
+ first = false;
+ }
+
+ str << *inner;
+ }
+
+ str << '>';
+ return str;
+ }
+
+ if (type->isNullable()) {
+ type = removeNullable(type);
+ if (type->getTypeId() == NDB::TypeIndex::Nothing) {
+ return "Null";
+ }
+
+ auto inner = MakeYqlType(type, validTz);
+ if (!inner) {
+ return std::nullopt;
+ }
+
+ return "Optional<" + *inner + '>';
+ }
+
+ if (type->getTypeId() == NDB::TypeIndex::UInt8) return "Uint8";
+ else if (type->getTypeId() == NDB::TypeIndex::Int8) return "Int8";
+ else if (type->getTypeId() == NDB::TypeIndex::UInt16) return "Uint16";
+ else if (type->getTypeId() == NDB::TypeIndex::Int16) return "Int16";
+ else if (type->getTypeId() == NDB::TypeIndex::UInt32) return "Uint32";
+ else if (type->getTypeId() == NDB::TypeIndex::Int32) return "Int32";
+ else if (type->getTypeId() == NDB::TypeIndex::UInt64) return "Uint64";
+ else if (type->getTypeId() == NDB::TypeIndex::Int64) return "Int64";
+ else if (type->getTypeId() == NDB::TypeIndex::Float32) return "Float";
+ else if (type->getTypeId() == NDB::TypeIndex::Float64) return "Double";
+ else if (type->getTypeId() == NDB::TypeIndex::String) return "String";
+ else if (type->getTypeId() == NDB::TypeIndex::FixedString) return "String";
+ else if (validTz && type->getTypeId() == NDB::TypeIndex::Date) return "TzDate";
+ else if (validTz && type->getTypeId() == NDB::TypeIndex::DateTime) return "TzDatetime";
+ else if (type->getTypeId() == NDB::TypeIndex::UUID) return "Uuid";
+ else return std::nullopt;
+}
+
+SIMPLE_UDF(TToYqlType, TOptional<TUtf8>(TUtf8, TUtf8)) {
+ const auto ref = args[0].AsStringRef();
+ const auto tzRef = args[1].AsStringRef();
+ ui32 tzId;
+ const bool validTz = valueBuilder->GetDateBuilder().FindTimezoneId(tzRef, tzId);
+ const NDB::String typeStr(ref.Data(), ref.Data() + ref.Size());
+ const auto type = NDB::DataTypeFactory::instance().get(typeStr);
+ if (const auto ret = MakeYqlType(type, validTz)) {
+ return valueBuilder->NewString(*ret);
+ }
+ return TUnboxedValue();
+}
+
+}
+
+class TClickHouseClientModule : public IUdfModule
+{
+public:
+ TClickHouseClientModule() = default;
+
+ static const TStringRef& Name() {
+ static constexpr auto name = TStringRef::Of("ClickHouseClient");
+ return name;
+ }
+
+ void CleanupOnTerminate() const final {}
+
+ void GetAllFunctions(IFunctionsSink& sink) const final {
+ sink.Add(TStringRef::Of("ToYqlType"));
+ sink.Add(TStringRef::Of("ParseFormat"))->SetTypeAwareness();
+ sink.Add(TStringRef::Of("ParseBlocks"))->SetTypeAwareness();
+ sink.Add(TStringRef::Of("ParseFromYdb"))->SetTypeAwareness();
+ sink.Add(TStringRef::Of("SerializeFormat"))->SetTypeAwareness();
+ }
+
+ void BuildFunctionTypeInfo(
+ const TStringRef& name,
+ TType* userType,
+ const TStringRef& typeConfig,
+ ui32 flags,
+ IFunctionTypeInfoBuilder& builder) const final try {
+ LazyInitContext();
+ auto argBuilder = builder.Args();
+ if (name == "ToYqlType") {
+ argBuilder->Add<TUtf8>();
+ argBuilder->Add<TUtf8>();
+ argBuilder->Done();
+ builder.Returns<TOptional<TUtf8>>();
+ if (!(flags & TFlags::TypesOnly)) {
+ builder.Implementation(new TToYqlType(builder));
+ }
+
+ return;
+ }
+
+ const auto typeHelper = builder.TypeInfoHelper();
+ if (name == "ParseBlocks") {
+ const auto userTypeInspector = TTupleTypeInspector(*typeHelper, userType);
+ if (!userTypeInspector || userTypeInspector.GetElementsCount() < 3) {
+ return builder.SetError("Invalid user type.");
+ }
+
+ const auto argsTypeTuple = userTypeInspector.GetElementType(0);
+ const auto argsTypeInspector = TTupleTypeInspector(*typeHelper, argsTypeTuple);
+ if (!argsTypeInspector) {
+ return builder.SetError("Invalid user type - expected tuple.");
+ }
+
+ if (const auto argsCount = argsTypeInspector.GetElementsCount(); argsCount < 1 || argsCount > 2) {
+ ::TStringBuilder sb;
+ sb << "Invalid user type - expected one or two arguments, got: " << argsCount;
+ return builder.SetError(sb);
+ }
+
+ ui32 tupleSize = 0;
+ TType* inputItemType = builder.Resource("ClickHouseClient.Block");
+
+ const auto resultType = userTypeInspector.GetElementType(2);
+ auto structType = TStructTypeInspector(*typeHelper, resultType);
+ if (!structType) {
+ if (const auto tupleType = TTupleTypeInspector(*typeHelper, resultType)) {
+ tupleSize = tupleType.GetElementsCount();
+ if (tupleSize > 0) {
+ structType = TStructTypeInspector(*typeHelper, tupleType.GetElementType(0));
+ auto tupleBuilder = builder.Tuple(tupleSize);
+ tupleBuilder->Add(inputItemType);
+ for (ui32 i = 1; i < tupleSize; ++i) {
+ tupleBuilder->Add(tupleType.GetElementType(i));
+ }
+ inputItemType = tupleBuilder->Build();
+ }
+ }
+ }
+
+ builder.UserType(userType);
+ builder.Args()->Add(builder.Stream()->Item(inputItemType)).Add<TOptional<TUtf8>>().Done();
+ builder.OptionalArgs(1U);
+ builder.Returns(builder.Stream()->Item(resultType));
+
+ if (structType) {
+ std::vector<TColumnMeta> outMeta(structType.GetMembersCount());
+ for (ui32 i = 0U; i < structType.GetMembersCount(); ++i) {
+ if (auto& meta = outMeta[i]; !GetDataType(builder, *typeHelper, structType.GetMemberType(i), meta)) {
+ ::TStringBuilder sb;
+ sb << "Incompatible column '" << structType.GetMemberName(i) << "' type: ";
+ TTypePrinter(*typeHelper, structType.GetMemberType(i)).Out(sb.Out);
+ return builder.SetError(sb);
+ }
+ }
+
+ if (!(flags & TFlags::TypesOnly)) {
+ builder.Implementation(new TParseBlocks(builder.GetSourcePosition(), std::move(outMeta), tupleSize));
+ }
+ return;
+ } else {
+ ::TStringBuilder sb;
+ sb << "Incompatible row type: ";
+ TTypePrinter(*typeHelper, resultType).Out(sb.Out);
+ return builder.SetError(sb);
+ }
+ } else if (name == "ParseFormat") {
+ const auto userTypeInspector = TTupleTypeInspector(*typeHelper, userType);
+ if (!userTypeInspector || userTypeInspector.GetElementsCount() < 3) {
+ return builder.SetError("Invalid user type.");
+ }
+
+ const auto argsTypeTuple = userTypeInspector.GetElementType(0);
+ const auto argsTypeInspector = TTupleTypeInspector(*typeHelper, argsTypeTuple);
+ if (!argsTypeInspector) {
+ return builder.SetError("Invalid user type - expected tuple.");
+ }
+
+ if (const auto argsCount = argsTypeInspector.GetElementsCount(); argsCount < 1 || argsCount > 2) {
+ ::TStringBuilder sb;
+ sb << "Invalid user type - expected one or two arguments, got: " << argsCount;
+ return builder.SetError(sb);
+ }
+
+ ui32 tupleSize = 0;
+ TType* inputItemType = builder.Primitive(GetDataTypeInfo(EDataSlot::String).TypeId);
+
+ const auto resultType = userTypeInspector.GetElementType(2);
+ auto structType = TStructTypeInspector(*typeHelper, resultType);
+ if (!structType) {
+ if (const auto tupleType = TTupleTypeInspector(*typeHelper, resultType)) {
+ tupleSize = tupleType.GetElementsCount();
+ if (tupleSize > 0) {
+ structType = TStructTypeInspector(*typeHelper, tupleType.GetElementType(0));
+ auto tupleBuilder = builder.Tuple(tupleSize);
+ tupleBuilder->Add(inputItemType);
+ for (ui32 i = 1; i < tupleSize; ++i) {
+ tupleBuilder->Add(tupleType.GetElementType(i));
+ }
+ inputItemType = tupleBuilder->Build();
+ }
+ }
+ }
+
+ builder.UserType(userType);
+ builder.Args()->Add(builder.Stream()->Item(inputItemType)).Add<TOptional<TUtf8>>().Done();
+ builder.OptionalArgs(1U);
+ builder.Returns(builder.Stream()->Item(resultType));
+
+ if (structType) {
+ std::vector<TColumnMeta> outMeta(structType.GetMembersCount());
+ NDB::ColumnsWithTypeAndName columns(structType.GetMembersCount());
+ for (ui32 i = 0U; i < structType.GetMembersCount(); ++i) {
+ if (auto& meta = outMeta[i]; GetDataType(builder, *typeHelper, structType.GetMemberType(i), meta)) {
+ auto& colsumn = columns[i];
+ colsumn.type = MetaToClickHouse(meta);
+ colsumn.name = structType.GetMemberName(i);
+ } else {
+ ::TStringBuilder sb;
+ sb << "Incompatible column '" << structType.GetMemberName(i) << "' type: ";
+ TTypePrinter(*typeHelper, structType.GetMemberType(i)).Out(sb.Out);
+ return builder.SetError(sb);
+ }
+ }
+
+ if (!(flags & TFlags::TypesOnly)) {
+ const std::string_view& typeCfg = typeConfig;
+ const auto jsonFrom = typeCfg.find('{');
+ builder.Implementation(new TParseFormat(typeCfg.substr(0U, jsonFrom),
+ std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom),
+ builder.GetSourcePosition(),
+ std::move(outMeta),
+ std::move(columns),
+ tupleSize));
+ }
+ return;
+ } else {
+ ::TStringBuilder sb;
+ sb << "Incompatible row type: ";
+ TTypePrinter(*typeHelper, resultType).Out(sb.Out);
+ return builder.SetError(sb);
+ }
+ } else if (name == "SerializeFormat") {
+ const auto userTypeInspector = TTupleTypeInspector(*typeHelper, userType);
+ if (!userTypeInspector || userTypeInspector.GetElementsCount() < 1) {
+ return builder.SetError("Invalid user type.");
+ }
+
+ const auto argsTypeTuple = userTypeInspector.GetElementType(0);
+ const auto argsTypeInspector = TTupleTypeInspector(*typeHelper, argsTypeTuple);
+ if (!argsTypeInspector) {
+ return builder.SetError("Invalid user type - expected tuple.");
+ }
+
+ if (const auto argsCount = argsTypeInspector.GetElementsCount(); argsCount != 1) {
+ ::TStringBuilder sb;
+ sb << "Invalid user type - expected one or two arguments, got: " << argsCount;
+ return builder.SetError(sb);
+ }
+
+ const auto inputType = argsTypeInspector.GetElementType(0);
+
+ const std::string_view& typeCfg = typeConfig;
+ const auto jsonFrom = typeCfg.find('{');
+ std::vector<std::string> keys;
+ const auto& tail = std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom);
+ size_t blockSizeLimit = 256_MB;
+ size_t keysCountLimit = 4096;
+ size_t totalSizeLimit = 1_GB;
+ if (!tail.empty()) {
+ const std::string str(tail);
+ const JSON json(str);
+ if (json.has("keys")) {
+ const auto& list = json["keys"];
+ keys.reserve(list.size());
+ std::transform(list.begin(), list.end(), std::back_inserter(keys), std::mem_fun_ref(&JSON::getString));
+ }
+ if (json.has("block_size_limit")) {
+ blockSizeLimit = json["block_size_limit"].getUInt();
+ }
+ if (json.has("keys_count_limit")) {
+ keysCountLimit = json["keys_count_limit"].getUInt();
+ }
+ if (json.has("total_size_limit")) {
+ totalSizeLimit = json["total_size_limit"].getUInt();
+ }
+ }
+
+ builder.UserType(userType);
+ builder.Args()->Add(inputType).Done();
+ if (keys.empty())
+ builder.Returns(builder.Stream()->Item<const char*>());
+ else {
+ const auto tuple = builder.Tuple();
+ tuple->Add<const char*>();
+ for (auto k =0U; k < keys.size(); ++k)
+ tuple->Add<TUtf8>();
+ builder.Returns(builder.Stream()->Item(tuple->Build()));
+ }
+
+ if (const auto structType = TStructTypeInspector(*typeHelper, TStreamTypeInspector(*typeHelper, inputType).GetItemType())) {
+ std::vector<TColumnMeta> inMeta(structType.GetMembersCount());
+ std::vector<ui32> keyIndexes(keys.size()), payloadIndexes(structType.GetMembersCount() - keys.size());
+ NDB::ColumnsWithTypeAndName columns;
+ columns.reserve(payloadIndexes.size());
+ for (ui32 i = 0U; i < structType.GetMembersCount(); ++i) {
+ if (auto& meta = inMeta[i]; GetDataType(builder, *typeHelper, structType.GetMemberType(i), meta)) {
+ const std::string_view name = structType.GetMemberName(i);
+ bool payload = true;
+ for (ui32 k = 0U; k < keys.size(); ++k) {
+ if (keys[k] == name) {
+ keyIndexes[k] = i;
+ payload = false;
+ }
+ }
+
+ if (payload) {
+ payloadIndexes[columns.size()] = i;
+ columns.emplace_back(MetaToClickHouse(meta), std::string(name));
+ }
+ } else {
+ ::TStringBuilder sb;
+ sb << "Incompatible column '" << structType.GetMemberName(i) << "' type: ";
+ TTypePrinter(*typeHelper, structType.GetMemberType(i)).Out(sb.Out);
+ return builder.SetError(sb);
+ }
+ }
+
+ if (!(flags & TFlags::TypesOnly)) {
+ builder.Implementation(new TSerializeFormat(typeCfg.substr(0U, jsonFrom), tail, std::move(keyIndexes), std::move(payloadIndexes), blockSizeLimit, keysCountLimit, totalSizeLimit, builder.GetSourcePosition(), std::move(inMeta), std::move(columns)));
+ }
+ return;
+ } else {
+ ::TStringBuilder sb;
+ sb << "Incompatible row type: ";
+ TTypePrinter(*typeHelper, inputType).Out(sb.Out);
+ return builder.SetError(sb);
+ }
+ } else if (name == "ParseFromYdb") {
+ builder.UserType(userType);
+ builder.Args()->Add(builder.Stream()->Item<char*>()).Add<TOptional<TUtf8>>().Done();
+ builder.OptionalArgs(1U);
+ builder.Returns(builder.Stream()->Item(userType));
+
+ if (const auto structType = TStructTypeInspector(*typeHelper, userType)) {
+ std::vector<TColumnMeta> columns(structType.GetMembersCount());
+ for (ui32 i = 0U; i < structType.GetMembersCount(); ++i) {
+ if (const auto dataType = TDataAndDecimalTypeInspector(*typeHelper, TOptionalTypeInspector(*typeHelper, structType.GetMemberType(i)).GetItemType())) {
+ auto& meta = columns[i];
+ meta.Slot = GetDataSlot(dataType.GetTypeId());
+ meta.IsOptional = true;
+ meta.Precision = dataType.GetPrecision();
+ meta.Scale = dataType.GetScale();
+ } else {
+ ::TStringBuilder sb;
+ sb << "Incompatible column '" << structType.GetMemberName(i) << "' type: ";
+ TTypePrinter(*typeHelper, structType.GetMemberType(i)).Out(sb.Out);
+ return builder.SetError(sb);
+ }
+ }
+
+ if (!(flags & TFlags::TypesOnly)) {
+ builder.Implementation(new TParseFromYdb(builder.GetSourcePosition(), std::move(columns)));
+ }
+ return;
+ } else {
+ ::TStringBuilder sb;
+ sb << "Incompatible row type: ";
+ TTypePrinter(*typeHelper, userType).Out(sb.Out);
+ return builder.SetError(sb);
+ }
+ }
+ }
+ catch (const Poco::Exception& e) {
+ builder.SetError(e.displayText());
+ }
+ catch (const std::exception& e) {
+ builder.SetError(TStringBuf(e.what()));
+ }
+private:
+ void LazyInitContext() const {
+ const std::unique_lock lock(CtxMutex);
+ if (!Ctx) {
+ if (auto ctx = StaticCtx.lock()) {
+ Ctx = std::move(ctx);
+ } else {
+ StaticCtx = Ctx = std::make_shared<TCHInitializer>();
+ }
+ }
+ }
+
+ static std::mutex CtxMutex;
+ static TCHInitializer::TWeakPtr StaticCtx;
+ mutable TCHInitializer::TPtr Ctx;
+};
+
+std::mutex TClickHouseClientModule::CtxMutex;
+TCHInitializer::TWeakPtr TClickHouseClientModule::StaticCtx;
+
+REGISTER_MODULES(TClickHouseClientModule);
\ No newline at end of file |