diff options
author | vvvv <vvvv@yandex-team.com> | 2025-02-13 22:36:29 +0300 |
---|---|---|
committer | vvvv <vvvv@yandex-team.com> | 2025-02-13 22:53:27 +0300 |
commit | dcc9572fc76d9c740c97de127bdee1e0b71273b0 (patch) | |
tree | 48e1e3c9e31e31a346e6905dcf4dc709dd6be802 | |
parent | a487ac9a1cd2053ec24449dd6c4a7e70c1cc2b8d (diff) | |
download | ydb-dcc9572fc76d9c740c97de127bdee1e0b71273b0.tar.gz |
Refactor out codec for table format
init
commit_hash:65402bd8880a077306c1ded09b6d1aa8ea03cd1e
-rw-r--r-- | yql/essentials/minikql/protobuf_udf/ut/value_builder_ut.cpp | 7 | ||||
-rw-r--r-- | yql/essentials/minikql/protobuf_udf/ut/ya.make | 1 | ||||
-rw-r--r-- | yql/essentials/providers/common/codec/ya.make | 1 | ||||
-rw-r--r-- | yql/essentials/providers/common/codec/yql_codec.cpp | 1691 | ||||
-rw-r--r-- | yql/essentials/providers/common/codec/yql_codec.h | 34 | ||||
-rw-r--r-- | yt/yql/providers/yt/codec/codegen/ya.make.inc | 1 | ||||
-rw-r--r-- | yt/yql/providers/yt/codec/codegen/yt_codec_cg.cpp | 7 | ||||
-rw-r--r-- | yt/yql/providers/yt/codec/ya.make | 1 | ||||
-rw-r--r-- | yt/yql/providers/yt/codec/yt_codec.cpp | 1918 | ||||
-rw-r--r-- | yt/yql/providers/yt/codec/yt_codec.h | 26 | ||||
-rw-r--r-- | yt/yql/providers/yt/codec/yt_codec_io.cpp | 16 | ||||
-rw-r--r-- | yt/yql/providers/yt/lib/res_pull/res_or_pull.cpp | 2 |
12 files changed, 2056 insertions, 1649 deletions
diff --git a/yql/essentials/minikql/protobuf_udf/ut/value_builder_ut.cpp b/yql/essentials/minikql/protobuf_udf/ut/value_builder_ut.cpp index 4d8a77dced..a3ed5f796b 100644 --- a/yql/essentials/minikql/protobuf_udf/ut/value_builder_ut.cpp +++ b/yql/essentials/minikql/protobuf_udf/ut/value_builder_ut.cpp @@ -5,6 +5,7 @@ #include <yql/essentials/providers/common/codec/yql_codec.h> #include <yql/essentials/providers/common/codec/yql_codec_buf.h> +#include <yt/yql/providers/yt/codec/yt_codec.h> #include <yql/essentials/minikql/mkql_alloc.h> #include <yql/essentials/minikql/mkql_node.h> #include <yql/essentials/minikql/mkql_type_builder.h> @@ -58,12 +59,12 @@ struct TSetup { template <typename TProto> TString YsonToProtoText(TSetup& setup, NUdf::TProtoInfo& info, TStringBuf yson) { TStringStream err; - auto val = NCommon::ParseYsonValue( + auto val = ParseYsonValueInTableFormat( setup.HolderFactory, NYT::NodeToYsonString(NYT::NodeFromYsonString(yson), ::NYson::EYsonFormat::Binary), static_cast<NKikimr::NMiniKQL::TStructType*>(info.StructType), 0, - &err, true); + &err); if (!val) { throw yexception() << err.Str(); } @@ -120,7 +121,7 @@ TString ProtoTextToYson(TSetup& setup, NUdf::TProtoInfo& info, TStringBuf protoT auto value = FillValueFromProto(proto, &setup.ValueBuilder, info); TTestWriter out; NCommon::TOutputBuf buf(out, nullptr); - NCommon::WriteYsonValueInTableFormat(buf, static_cast<NKikimr::NMiniKQL::TStructType*>(info.StructType), 0, value, true); + WriteYsonValueInTableFormat(buf, static_cast<NKikimr::NMiniKQL::TStructType*>(info.StructType), 0, value, true); buf.Finish(); return NYT::NodeToYsonString(NYT::NodeFromYsonString(out.Str()), ::NYson::EYsonFormat::Text); diff --git a/yql/essentials/minikql/protobuf_udf/ut/ya.make b/yql/essentials/minikql/protobuf_udf/ut/ya.make index dd4741870f..e788d5da8d 100644 --- a/yql/essentials/minikql/protobuf_udf/ut/ya.make +++ b/yql/essentials/minikql/protobuf_udf/ut/ya.make @@ -11,6 +11,7 @@ SRCS( PEERDIR( yt/yql/providers/yt/lib/schema yt/yql/providers/yt/common + yt/yql/providers/yt/codec yql/essentials/public/udf/service/exception_policy yql/essentials/minikql yql/essentials/public/udf diff --git a/yql/essentials/providers/common/codec/ya.make b/yql/essentials/providers/common/codec/ya.make index ca9d05dd2a..a55ef22bbf 100644 --- a/yql/essentials/providers/common/codec/ya.make +++ b/yql/essentials/providers/common/codec/ya.make @@ -19,7 +19,6 @@ PEERDIR( library/cpp/yson library/cpp/json library/cpp/enumbitset - yt/yt/library/decimal ) YQL_LAST_ABI_VERSION() diff --git a/yql/essentials/providers/common/codec/yql_codec.cpp b/yql/essentials/providers/common/codec/yql_codec.cpp index 6710498276..04517c6e2e 100644 --- a/yql/essentials/providers/common/codec/yql_codec.cpp +++ b/yql/essentials/providers/common/codec/yql_codec.cpp @@ -23,8 +23,6 @@ #include <util/string/cast.h> #include <util/generic/map.h> -#include <yt/yt/library/decimal/decimal.h> - namespace NYql { namespace NCommon { @@ -289,7 +287,7 @@ TMaybe<TVector<ui32>> CreateStructPositions(TType* inputType, const TVector<TStr if (inputType->GetKind() != TType::EKind::Struct) { return Nothing(); } - + auto inputStruct = AS_TYPE(TStructType, inputType); TMap<TStringBuf, ui32> members; TVector<ui32> structPositions(inputStruct->GetMembersCount(), Max<ui32>()); @@ -784,119 +782,66 @@ T ReadNextSerializedNumber(char cmd, TInputBuf& buf) { } template <typename T> -T ReadYsonFloatNumber(char cmd, TInputBuf& buf, bool isTableFormat) { - if (isTableFormat) { - CHECK_EXPECTED(cmd, DoubleMarker); - double dbl; - buf.ReadMany((char*)&dbl, sizeof(dbl)); - return dbl; - } - +T ReadYsonFloatNumber(char cmd, TInputBuf& buf) { return ReadNextSerializedNumber<T>(cmd, buf); } -NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags, - const NKikimr::NMiniKQL::THolderFactory& holderFactory, char cmd, TInputBuf& buf, bool isTableFormat) { +NUdf::TUnboxedValue ReadYsonValue(TType* type, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, char cmd, TInputBuf& buf) { switch (type->GetKind()) { case TType::EKind::Variant: { auto varType = static_cast<TVariantType*>(type); auto underlyingType = varType->GetUnderlyingType(); - if (isTableFormat && (nativeYtTypeFlags & NTCF_COMPLEX)) { - CHECK_EXPECTED(cmd, BeginListSymbol); - cmd = buf.Read(); - TType* type = nullptr; - i64 index = 0; - if (cmd == StringMarker) { - YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type"); - auto structType = static_cast<TStructType*>(underlyingType); - auto nameBuffer = ReadNextString(cmd, buf); - auto foundIndex = structType->FindMemberIndex(nameBuffer); - YQL_ENSURE(foundIndex.Defined(), "Unexpected member: " << nameBuffer); - index = *foundIndex; - type = varType->GetAlternativeType(index); - } else { - YQL_ENSURE(cmd == Int64Marker || cmd == Uint64Marker); - YQL_ENSURE(underlyingType->IsTuple(), "Expected tuple as underlying type"); - if (cmd == Uint64Marker) { - index = buf.ReadVarUI64(); - } else { - index = buf.ReadVarI64(); - } - YQL_ENSURE(0 <= index && index < varType->GetAlternativesCount(), "Unexpected member index: " << index); - type = varType->GetAlternativeType(index); - } - cmd = buf.Read(); - CHECK_EXPECTED(cmd, ListItemSeparatorSymbol); - cmd = buf.Read(); - auto value = ReadYsonValue(type, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat); - cmd = buf.Read(); - if (cmd != EndListSymbol) { - CHECK_EXPECTED(cmd, ListItemSeparatorSymbol); - cmd = buf.Read(); - CHECK_EXPECTED(cmd, EndListSymbol); - } - return holderFactory.CreateVariantHolder(value.Release(), index); - } else { - if (cmd == StringMarker) { - YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type"); - auto name = ReadNextString(cmd, buf); - auto index = static_cast<TStructType*>(underlyingType)->FindMemberIndex(name); - YQL_ENSURE(index, "Unexpected member: " << name); - YQL_ENSURE(static_cast<TStructType*>(underlyingType)->GetMemberType(*index)->IsVoid(), "Expected Void as underlying type"); - return holderFactory.CreateVariantHolder(NUdf::TUnboxedValuePod::Zero(), *index); - } - - CHECK_EXPECTED(cmd, BeginListSymbol); - cmd = buf.Read(); - i64 index = 0; - if (isTableFormat) { - YQL_ENSURE(cmd == Int64Marker || cmd == Uint64Marker); - if (cmd == Uint64Marker) { - index = buf.ReadVarUI64(); - } else { - index = buf.ReadVarI64(); - } - } else { - if (cmd == BeginListSymbol) { - cmd = buf.Read(); - YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type"); - auto name = ReadNextString(cmd, buf); - auto foundIndex = static_cast<TStructType*>(underlyingType)->FindMemberIndex(name); - YQL_ENSURE(foundIndex, "Unexpected member: " << name); - index = *foundIndex; - cmd = buf.Read(); - if (cmd == ListItemSeparatorSymbol) { - cmd = buf.Read(); - } - - CHECK_EXPECTED(cmd, EndListSymbol); - } else { - index = ReadNextSerializedNumber<ui64>(cmd, buf); - } - } - - YQL_ENSURE(index < varType->GetAlternativesCount(), "Bad variant alternative: " << index << ", only " << - varType->GetAlternativesCount() << " are available"); - YQL_ENSURE(underlyingType->IsTuple() || underlyingType->IsStruct(), "Wrong underlying type"); - TType* itemType; - if (underlyingType->IsTuple()) { - itemType = static_cast<TTupleType*>(underlyingType)->GetElementType(index); - } - else { - itemType = static_cast<TStructType*>(underlyingType)->GetMemberType(index); - } + if (cmd == StringMarker) { + YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type"); + auto name = ReadNextString(cmd, buf); + auto index = static_cast<TStructType*>(underlyingType)->FindMemberIndex(name); + YQL_ENSURE(index, "Unexpected member: " << name); + YQL_ENSURE(static_cast<TStructType*>(underlyingType)->GetMemberType(*index)->IsVoid(), "Expected Void as underlying type"); + return holderFactory.CreateVariantHolder(NUdf::TUnboxedValuePod::Zero(), *index); + } - EXPECTED(buf, ListItemSeparatorSymbol); + CHECK_EXPECTED(cmd, BeginListSymbol); + cmd = buf.Read(); + i64 index = 0; + if (cmd == BeginListSymbol) { cmd = buf.Read(); - auto value = ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat); + YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type"); + auto name = ReadNextString(cmd, buf); + auto foundIndex = static_cast<TStructType*>(underlyingType)->FindMemberIndex(name); + YQL_ENSURE(foundIndex, "Unexpected member: " << name); + index = *foundIndex; cmd = buf.Read(); if (cmd == ListItemSeparatorSymbol) { cmd = buf.Read(); } CHECK_EXPECTED(cmd, EndListSymbol); - return holderFactory.CreateVariantHolder(value.Release(), index); + } else { + index = ReadNextSerializedNumber<ui64>(cmd, buf); } + + YQL_ENSURE(index < varType->GetAlternativesCount(), "Bad variant alternative: " << index << ", only " << + varType->GetAlternativesCount() << " are available"); + YQL_ENSURE(underlyingType->IsTuple() || underlyingType->IsStruct(), "Wrong underlying type"); + TType* itemType; + if (underlyingType->IsTuple()) { + itemType = static_cast<TTupleType*>(underlyingType)->GetElementType(index); + } + else { + itemType = static_cast<TStructType*>(underlyingType)->GetMemberType(index); + } + + EXPECTED(buf, ListItemSeparatorSymbol); + cmd = buf.Read(); + auto value = ReadYsonValue(itemType, holderFactory, cmd, buf); + cmd = buf.Read(); + if (cmd == ListItemSeparatorSymbol) { + cmd = buf.Read(); + } + + CHECK_EXPECTED(cmd, EndListSymbol); + return holderFactory.CreateVariantHolder(value.Release(), index); } case TType::EKind::Data: { @@ -907,117 +852,49 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags, return NUdf::TUnboxedValuePod(cmd == TrueMarker); case NUdf::TDataType<ui8>::Id: - if (isTableFormat) { - CHECK_EXPECTED(cmd, Uint64Marker); - return NUdf::TUnboxedValuePod(ui8(buf.ReadVarUI64())); - } return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<ui8>(cmd, buf)); case NUdf::TDataType<i8>::Id: - if (isTableFormat) { - CHECK_EXPECTED(cmd, Int64Marker); - return NUdf::TUnboxedValuePod(i8(buf.ReadVarI64())); - } return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<i8>(cmd, buf)); case NUdf::TDataType<ui16>::Id: - if (isTableFormat) { - CHECK_EXPECTED(cmd, Uint64Marker); - return NUdf::TUnboxedValuePod(ui16(buf.ReadVarUI64())); - } return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<ui16>(cmd, buf)); case NUdf::TDataType<i16>::Id: - if (isTableFormat) { - CHECK_EXPECTED(cmd, Int64Marker); - return NUdf::TUnboxedValuePod(i16(buf.ReadVarI64())); - } return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<i16>(cmd, buf)); case NUdf::TDataType<i32>::Id: - if (isTableFormat) { - CHECK_EXPECTED(cmd, Int64Marker); - return NUdf::TUnboxedValuePod(i32(buf.ReadVarI64())); - } return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<i32>(cmd, buf)); case NUdf::TDataType<ui32>::Id: - if (isTableFormat) { - CHECK_EXPECTED(cmd, Uint64Marker); - return NUdf::TUnboxedValuePod(ui32(buf.ReadVarUI64())); - } return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<ui32>(cmd, buf)); case NUdf::TDataType<i64>::Id: - if (isTableFormat) { - CHECK_EXPECTED(cmd, Int64Marker); - return NUdf::TUnboxedValuePod(buf.ReadVarI64()); - } return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<i64>(cmd, buf)); case NUdf::TDataType<ui64>::Id: - if (isTableFormat) { - CHECK_EXPECTED(cmd, Uint64Marker); - return NUdf::TUnboxedValuePod(buf.ReadVarUI64()); - } return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<ui64>(cmd, buf)); case NUdf::TDataType<float>::Id: - return NUdf::TUnboxedValuePod(ReadYsonFloatNumber<float>(cmd, buf, isTableFormat)); + return NUdf::TUnboxedValuePod(ReadYsonFloatNumber<float>(cmd, buf)); case NUdf::TDataType<double>::Id: - return NUdf::TUnboxedValuePod(ReadYsonFloatNumber<double>(cmd, buf, isTableFormat)); + return NUdf::TUnboxedValuePod(ReadYsonFloatNumber<double>(cmd, buf)); case NUdf::TDataType<NUdf::TUtf8>::Id: case NUdf::TDataType<char*>::Id: case NUdf::TDataType<NUdf::TJson>::Id: case NUdf::TDataType<NUdf::TDyNumber>::Id: case NUdf::TDataType<NUdf::TUuid>::Id: { - if (isTableFormat) { - auto nextString = ReadNextString(cmd, buf); - return NUdf::TUnboxedValue(MakeString(NUdf::TStringRef(nextString))); - } - return ReadYsonStringInResultFormat(cmd, buf); } case NUdf::TDataType<NUdf::TDecimal>::Id: { auto nextString = ReadNextString(cmd, buf); - if (isTableFormat) { - if (nativeYtTypeFlags & NTCF_DECIMAL) { - auto const params = static_cast<TDataDecimalType*>(type)->GetParams(); - if (params.first < 10) { - // The YQL format differs from the YT format in the inf/nan values. NDecimal::FromYtDecimal converts nan/inf - NDecimal::TInt128 res = NDecimal::FromYtDecimal(NYT::NDecimal::TDecimal::ParseBinary32(params.first, nextString)); - YQL_ENSURE(!NDecimal::IsError(res)); - return NUdf::TUnboxedValuePod(res); - } else if (params.first < 19) { - NDecimal::TInt128 res = NDecimal::FromYtDecimal(NYT::NDecimal::TDecimal::ParseBinary64(params.first, nextString)); - YQL_ENSURE(!NDecimal::IsError(res)); - return NUdf::TUnboxedValuePod(res); - } else { - YQL_ENSURE(params.first < 36); - NYT::NDecimal::TDecimal::TValue128 tmpRes = NYT::NDecimal::TDecimal::ParseBinary128(params.first, nextString); - NDecimal::TInt128 res; - static_assert(sizeof(NDecimal::TInt128) == sizeof(NYT::NDecimal::TDecimal::TValue128)); - memcpy(&res, &tmpRes, sizeof(NDecimal::TInt128)); - res = NDecimal::FromYtDecimal(res); - YQL_ENSURE(!NDecimal::IsError(res)); - return NUdf::TUnboxedValuePod(res); - } - } - else { - const auto& des = NDecimal::Deserialize(nextString.data(), nextString.size()); - YQL_ENSURE(!NDecimal::IsError(des.first)); - YQL_ENSURE(nextString.size() == des.second); - return NUdf::TUnboxedValuePod(des.first); - } - } else { - const auto params = static_cast<TDataDecimalType*>(type)->GetParams(); - const auto val = NDecimal::FromString(nextString, params.first, params.second); - YQL_ENSURE(!NDecimal::IsError(val)); - return NUdf::TUnboxedValuePod(val); - } + const auto params = static_cast<TDataDecimalType*>(type)->GetParams(); + const auto val = NDecimal::FromString(nextString, params.first, params.second); + YQL_ENSURE(!NDecimal::IsError(val)); + return NUdf::TUnboxedValuePod(val); } case NUdf::TDataType<NUdf::TYson>::Id: { @@ -1025,114 +902,55 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags, yson.clear(); CopyYsonWithAttrs(cmd, buf, yson); - if (isTableFormat) { - return NUdf::TUnboxedValue(MakeString(NUdf::TStringRef(yson))); - } - TString decodedYson = NResult::DecodeRestrictedYson(TStringBuf(yson.data(), yson.size()), NYson::EYsonFormat::Text); return NUdf::TUnboxedValue(MakeString(NUdf::TStringRef(decodedYson))); } case NUdf::TDataType<NUdf::TDate>::Id: - if (isTableFormat) { - CHECK_EXPECTED(cmd, Uint64Marker); - return NUdf::TUnboxedValuePod((ui16)buf.ReadVarUI64()); - } return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<ui16>(cmd, buf)); case NUdf::TDataType<NUdf::TDatetime>::Id: - if (isTableFormat) { - CHECK_EXPECTED(cmd, Uint64Marker); - return NUdf::TUnboxedValuePod((ui32)buf.ReadVarUI64()); - } return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<ui32>(cmd, buf)); case NUdf::TDataType<NUdf::TTimestamp>::Id: - if (isTableFormat) { - CHECK_EXPECTED(cmd, Uint64Marker); - return NUdf::TUnboxedValuePod(buf.ReadVarUI64()); - } return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<ui64>(cmd, buf)); case NUdf::TDataType<NUdf::TInterval>::Id: - if (isTableFormat) { - CHECK_EXPECTED(cmd, Int64Marker); - return NUdf::TUnboxedValuePod(buf.ReadVarI64()); - } return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<i64>(cmd, buf)); case NUdf::TDataType<NUdf::TTzDate>::Id: { auto nextString = ReadNextString(cmd, buf); NUdf::TUnboxedValuePod data; - if (isTableFormat) { - ui16 value; - ui16 tzId = 0; - YQL_ENSURE(DeserializeTzDate(nextString, value, tzId)); - data = NUdf::TUnboxedValuePod(value); - data.SetTimezoneId(tzId); - } else { - data = ValueFromString(NUdf::EDataSlot::TzDate, nextString); - YQL_ENSURE(data, "incorrect tz date format for value " << nextString); - } - + data = ValueFromString(NUdf::EDataSlot::TzDate, nextString); + YQL_ENSURE(data, "incorrect tz date format for value " << nextString); return data; } case NUdf::TDataType<NUdf::TTzDatetime>::Id: { auto nextString = ReadNextString(cmd, buf); NUdf::TUnboxedValuePod data; - if (isTableFormat) { - ui32 value; - ui16 tzId = 0; - YQL_ENSURE(DeserializeTzDatetime(nextString, value, tzId)); - data = NUdf::TUnboxedValuePod(value); - data.SetTimezoneId(tzId); - } else { - data = ValueFromString(NUdf::EDataSlot::TzDatetime, nextString); - YQL_ENSURE(data, "incorrect tz datetime format for value " << nextString); - } - + data = ValueFromString(NUdf::EDataSlot::TzDatetime, nextString); + YQL_ENSURE(data, "incorrect tz datetime format for value " << nextString); return data; } case NUdf::TDataType<NUdf::TTzTimestamp>::Id: { auto nextString = ReadNextString(cmd, buf); NUdf::TUnboxedValuePod data; - if (isTableFormat) { - ui64 value; - ui16 tzId = 0; - YQL_ENSURE(DeserializeTzTimestamp(nextString, value, tzId)); - data = NUdf::TUnboxedValuePod(value); - data.SetTimezoneId(tzId); - } else { - data = ValueFromString(NUdf::EDataSlot::TzTimestamp, nextString); - YQL_ENSURE(data, "incorrect tz timestamp format for value " << nextString); - } - + data = ValueFromString(NUdf::EDataSlot::TzTimestamp, nextString); + YQL_ENSURE(data, "incorrect tz timestamp format for value " << nextString); return data; } case NUdf::TDataType<NUdf::TDate32>::Id: - if (isTableFormat) { - CHECK_EXPECTED(cmd, Int64Marker); - return NUdf::TUnboxedValuePod((i32)buf.ReadVarI64()); - } return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<i32>(cmd, buf)); case NUdf::TDataType<NUdf::TDatetime64>::Id: case NUdf::TDataType<NUdf::TTimestamp64>::Id: case NUdf::TDataType<NUdf::TInterval64>::Id: - if (isTableFormat) { - CHECK_EXPECTED(cmd, Int64Marker); - return NUdf::TUnboxedValuePod(buf.ReadVarI64()); - } return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<i64>(cmd, buf)); case NUdf::TDataType<NUdf::TJsonDocument>::Id: { - if (isTableFormat) { - return ValueFromString(EDataSlot::JsonDocument, ReadNextString(cmd, buf)); - } - const auto json = ReadYsonStringInResultFormat(cmd, buf); return ValueFromString(EDataSlot::JsonDocument, json.AsStringRef()); } @@ -1140,51 +958,24 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags, case NUdf::TDataType<NUdf::TTzDate32>::Id: { auto nextString = ReadNextString(cmd, buf); NUdf::TUnboxedValuePod data; - if (isTableFormat) { - i32 value; - ui16 tzId = 0; - YQL_ENSURE(DeserializeTzDate32(nextString, value, tzId)); - data = NUdf::TUnboxedValuePod(value); - data.SetTimezoneId(tzId); - } else { - data = ValueFromString(NUdf::EDataSlot::TzDate32, nextString); - YQL_ENSURE(data, "incorrect tz date format for value " << nextString); - } - + data = ValueFromString(NUdf::EDataSlot::TzDate32, nextString); + YQL_ENSURE(data, "incorrect tz date format for value " << nextString); return data; } case NUdf::TDataType<NUdf::TTzDatetime64>::Id: { auto nextString = ReadNextString(cmd, buf); NUdf::TUnboxedValuePod data; - if (isTableFormat) { - i64 value; - ui16 tzId = 0; - YQL_ENSURE(DeserializeTzDatetime64(nextString, value, tzId)); - data = NUdf::TUnboxedValuePod(value); - data.SetTimezoneId(tzId); - } else { - data = ValueFromString(NUdf::EDataSlot::TzDatetime64, nextString); - YQL_ENSURE(data, "incorrect tz datetime format for value " << nextString); - } - + data = ValueFromString(NUdf::EDataSlot::TzDatetime64, nextString); + YQL_ENSURE(data, "incorrect tz datetime format for value " << nextString); return data; } case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: { auto nextString = ReadNextString(cmd, buf); NUdf::TUnboxedValuePod data; - if (isTableFormat) { - i64 value; - ui16 tzId = 0; - YQL_ENSURE(DeserializeTzTimestamp64(nextString, value, tzId)); - data = NUdf::TUnboxedValuePod(value); - data.SetTimezoneId(tzId); - } else { - data = ValueFromString(NUdf::EDataSlot::TzTimestamp64, nextString); - YQL_ENSURE(data, "incorrect tz timestamp format for value " << nextString); - } - + data = ValueFromString(NUdf::EDataSlot::TzTimestamp64, nextString); + YQL_ENSURE(data, "incorrect tz timestamp format for value " << nextString); return data; } @@ -1202,7 +993,7 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags, cmd = buf.Read(); for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { - items[i] = ReadYsonValue(structType->GetMemberType(i), nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat); + items[i] = ReadYsonValue(structType->GetMemberType(i), holderFactory, cmd, buf); cmd = buf.Read(); if (cmd == ListItemSeparatorSymbol) { @@ -1227,11 +1018,7 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags, if (pos && cmd != '#') { auto memberType = structType->GetMemberType(*pos); auto unwrappedType = memberType; - if (!(nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) && isTableFormat && unwrappedType->IsOptional()) { - unwrappedType = static_cast<TOptionalType*>(unwrappedType)->GetItemType(); - } - - items[*pos] = ReadYsonValue(unwrappedType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat); + items[*pos] = ReadYsonValue(unwrappedType, holderFactory, cmd, buf); } else { SkipYson(cmd, buf); } @@ -1265,7 +1052,7 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags, break; } - items = items.Append(ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat)); + items = items.Append(ReadYsonValue(itemType, holderFactory, cmd, buf)); cmd = buf.Read(); if (cmd == ListItemSeparatorSymbol) { @@ -1281,40 +1068,24 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags, return NUdf::TUnboxedValuePod(); } auto itemType = static_cast<TOptionalType*>(type)->GetItemType(); - if (isTableFormat && (nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX)) { - if (itemType->GetKind() == TType::EKind::Optional || itemType->GetKind() == TType::EKind::Pg) { - CHECK_EXPECTED(cmd, BeginListSymbol); - cmd = buf.Read(); - auto value = ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat); - cmd = buf.Read(); - if (cmd == ListItemSeparatorSymbol) { - cmd = buf.Read(); - } - CHECK_EXPECTED(cmd, EndListSymbol); - return value.Release().MakeOptional(); - } else { - return ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat).Release().MakeOptional(); - } - } else { - if (cmd != BeginListSymbol) { - auto value = ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat); - return value.Release().MakeOptional(); - } + if (cmd != BeginListSymbol) { + auto value = ReadYsonValue(itemType, holderFactory, cmd, buf); + return value.Release().MakeOptional(); + } - cmd = buf.Read(); - if (cmd == EndListSymbol) { - return NUdf::TUnboxedValuePod(); - } + cmd = buf.Read(); + if (cmd == EndListSymbol) { + return NUdf::TUnboxedValuePod(); + } - auto value = ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat); + auto value = ReadYsonValue(itemType, holderFactory, cmd, buf); + cmd = buf.Read(); + if (cmd == ListItemSeparatorSymbol) { cmd = buf.Read(); - if (cmd == ListItemSeparatorSymbol) { - cmd = buf.Read(); - } - - CHECK_EXPECTED(cmd, EndListSymbol); - return value.Release().MakeOptional(); } + + CHECK_EXPECTED(cmd, EndListSymbol); + return value.Release().MakeOptional(); } case TType::EKind::Dict: { @@ -1353,7 +1124,7 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags, auto keyStr = NUdf::TUnboxedValue(MakeString(keyBuffer)); EXPECTED(buf, KeyValueSeparatorSymbol); cmd = buf.Read(); - auto payload = ReadYsonValue(payloadType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat); + auto payload = ReadYsonValue(payloadType, holderFactory, cmd, buf); map.emplace(std::move(keyStr), std::move(payload)); cmd = buf.Read(); @@ -1378,10 +1149,10 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags, CHECK_EXPECTED(cmd, BeginListSymbol); cmd = buf.Read(); - auto key = ReadYsonValue(keyType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat); + auto key = ReadYsonValue(keyType, holderFactory, cmd, buf); EXPECTED(buf, ListItemSeparatorSymbol); cmd = buf.Read(); - auto payload = ReadYsonValue(payloadType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat); + auto payload = ReadYsonValue(payloadType, holderFactory, cmd, buf); cmd = buf.Read(); if (cmd == ListItemSeparatorSymbol) { cmd = buf.Read(); @@ -1416,7 +1187,7 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags, cmd = buf.Read(); for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - items[i] = ReadYsonValue(tupleType->GetElementType(i), nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat); + items[i] = ReadYsonValue(tupleType->GetElementType(i), holderFactory, cmd, buf); cmd = buf.Read(); if (cmd == ListItemSeparatorSymbol) { @@ -1466,12 +1237,12 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags, case TType::EKind::Pg: { auto pgType = static_cast<TPgType*>(type); - return isTableFormat ? ReadYsonValueInTableFormatPg(pgType, cmd, buf) : ReadYsonValuePg(pgType, cmd, buf); + return ReadYsonValuePg(pgType, cmd, buf); } case TType::EKind::Tagged: { auto taggedType = static_cast<TTaggedType*>(type); - return ReadYsonValue(taggedType->GetBaseType(), nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat); + return ReadYsonValue(taggedType->GetBaseType(), holderFactory, cmd, buf); } default: @@ -1480,7 +1251,7 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags, } TMaybe<NUdf::TUnboxedValue> ParseYsonValue(const THolderFactory& holderFactory, - const TStringBuf& yson, TType* type, ui64 nativeYtTypeFlags, IOutputStream* err, bool isTableFormat) { + const TStringBuf& yson, TType* type, IOutputStream* err) { try { class TReader : public IBlockReader { public: @@ -1520,7 +1291,7 @@ TMaybe<NUdf::TUnboxedValue> ParseYsonValue(const THolderFactory& holderFactory, TReader reader(yson); TInputBuf buf(reader, nullptr); char cmd = buf.Read(); - return ReadYsonValue(type, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat); + return ReadYsonValue(type, holderFactory, cmd, buf); } catch (const yexception& e) { if (err) { @@ -1530,1293 +1301,9 @@ TMaybe<NUdf::TUnboxedValue> ParseYsonValue(const THolderFactory& holderFactory, } } -TMaybe<NUdf::TUnboxedValue> ParseYsonNode(const THolderFactory& holderFactory, - const NYT::TNode& node, TType* type, ui64 nativeYtTypeFlags, IOutputStream* err) { - return ParseYsonValue(holderFactory, NYT::NodeToYsonString(node, NYson::EYsonFormat::Binary), type, nativeYtTypeFlags, err, true); -} - TMaybe<NUdf::TUnboxedValue> ParseYsonNodeInResultFormat(const THolderFactory& holderFactory, const NYT::TNode& node, TType* type, IOutputStream* err) { - return ParseYsonValue(holderFactory, NYT::NodeToYsonString(node, NYson::EYsonFormat::Binary), type, 0, err, false); -} - -extern "C" void ReadYsonContainerValue(TType* type, ui64 nativeYtTypeFlags, const NKikimr::NMiniKQL::THolderFactory& holderFactory, - NUdf::TUnboxedValue& value, TInputBuf& buf, bool wrapOptional) { - // yson content - ui32 size; - buf.ReadMany((char*)&size, sizeof(size)); - CHECK_STRING_LENGTH_UNSIGNED(size); - // parse binary yson... - YQL_ENSURE(size > 0); - char cmd = buf.Read(); - auto tmp = ReadYsonValue(type, nativeYtTypeFlags, holderFactory, cmd, buf, true); - if (!wrapOptional) { - value = std::move(tmp); - } - else { - value = tmp.Release().MakeOptional(); - } -} - -NUdf::TUnboxedValue ReadSkiffData(TType* type, ui64 nativeYtTypeFlags, TInputBuf& buf) { - auto schemeType = static_cast<TDataType*>(type)->GetSchemeType(); - switch (schemeType) { - case NUdf::TDataType<bool>::Id: { - ui8 data; - buf.ReadMany((char*)&data, sizeof(data)); - return NUdf::TUnboxedValuePod(data != 0); - } - - case NUdf::TDataType<ui8>::Id: { - ui64 data; - buf.ReadMany((char*)&data, sizeof(data)); - return NUdf::TUnboxedValuePod(ui8(data)); - } - - case NUdf::TDataType<i8>::Id: { - i64 data; - buf.ReadMany((char*)&data, sizeof(data)); - return NUdf::TUnboxedValuePod(i8(data)); - } - - case NUdf::TDataType<NUdf::TDate>::Id: - case NUdf::TDataType<ui16>::Id: { - ui64 data; - buf.ReadMany((char*)&data, sizeof(data)); - return NUdf::TUnboxedValuePod(ui16(data)); - } - - case NUdf::TDataType<i16>::Id: { - i64 data; - buf.ReadMany((char*)&data, sizeof(data)); - return NUdf::TUnboxedValuePod(i16(data)); - } - - case NUdf::TDataType<NUdf::TDate32>::Id: - case NUdf::TDataType<i32>::Id: { - i64 data; - buf.ReadMany((char*)&data, sizeof(data)); - return NUdf::TUnboxedValuePod(i32(data)); - } - - case NUdf::TDataType<NUdf::TDatetime>::Id: - case NUdf::TDataType<ui32>::Id: { - ui64 data; - buf.ReadMany((char*)&data, sizeof(data)); - return NUdf::TUnboxedValuePod(ui32(data)); - } - - case NUdf::TDataType<NUdf::TInterval>::Id: - case NUdf::TDataType<NUdf::TInterval64>::Id: - case NUdf::TDataType<NUdf::TDatetime64>::Id: - case NUdf::TDataType<NUdf::TTimestamp64>::Id: - case NUdf::TDataType<i64>::Id: { - i64 data; - buf.ReadMany((char*)&data, sizeof(data)); - return NUdf::TUnboxedValuePod(data); - } - - case NUdf::TDataType<NUdf::TTimestamp>::Id: - case NUdf::TDataType<ui64>::Id: { - ui64 data; - buf.ReadMany((char*)&data, sizeof(data)); - return NUdf::TUnboxedValuePod(data); - } - - case NUdf::TDataType<float>::Id: { - double data; - buf.ReadMany((char*)&data, sizeof(data)); - return NUdf::TUnboxedValuePod(float(data)); - } - - case NUdf::TDataType<double>::Id: { - double data; - buf.ReadMany((char*)&data, sizeof(data)); - return NUdf::TUnboxedValuePod(data); - } - - case NUdf::TDataType<NUdf::TUtf8>::Id: - case NUdf::TDataType<char*>::Id: - case NUdf::TDataType<NUdf::TJson>::Id: - case NUdf::TDataType<NUdf::TYson>::Id: - case NUdf::TDataType<NUdf::TDyNumber>::Id: - case NUdf::TDataType<NUdf::TUuid>::Id: { - ui32 size; - buf.ReadMany((char*)&size, sizeof(size)); - CHECK_STRING_LENGTH_UNSIGNED(size); - auto str = NUdf::TUnboxedValue(MakeStringNotFilled(size)); - buf.ReadMany(str.AsStringRef().Data(), size); - return str; - } - - case NUdf::TDataType<NUdf::TDecimal>::Id: { - if (nativeYtTypeFlags & NTCF_DECIMAL) { - auto const params = static_cast<TDataDecimalType*>(type)->GetParams(); - if (params.first < 10) { - i32 data; - buf.ReadMany((char*)&data, sizeof(data)); - return NUdf::TUnboxedValuePod(NDecimal::FromYtDecimal(data)); - } else if (params.first < 19) { - i64 data; - buf.ReadMany((char*)&data, sizeof(data)); - return NUdf::TUnboxedValuePod(NDecimal::FromYtDecimal(data)); - } else { - YQL_ENSURE(params.first < 36); - NDecimal::TInt128 data; - buf.ReadMany((char*)&data, sizeof(data)); - return NUdf::TUnboxedValuePod(NDecimal::FromYtDecimal(data)); - } - } else { - ui32 size; - buf.ReadMany(reinterpret_cast<char*>(&size), sizeof(size)); - const auto maxSize = sizeof(NDecimal::TInt128); - YQL_ENSURE(size > 0U && size <= maxSize, "Bad decimal field size: " << size); - char data[maxSize]; - buf.ReadMany(data, size); - const auto& v = NDecimal::Deserialize(data, size); - YQL_ENSURE(!NDecimal::IsError(v.first), "Bad decimal field data: " << data); - YQL_ENSURE(size == v.second, "Bad decimal field size: " << size); - return NUdf::TUnboxedValuePod(v.first); - } - } - - case NUdf::TDataType<NUdf::TTzDate>::Id: { - ui32 size; - buf.ReadMany((char*)&size, sizeof(size)); - CHECK_STRING_LENGTH_UNSIGNED(size); - auto& vec = buf.YsonBuffer(); - vec.resize(size); - buf.ReadMany(vec.data(), size); - ui16 value; - ui16 tzId; - YQL_ENSURE(DeserializeTzDate(TStringBuf(vec.begin(), vec.end()), value, tzId)); - auto data = NUdf::TUnboxedValuePod(value); - data.SetTimezoneId(tzId); - return data; - } - - case NUdf::TDataType<NUdf::TTzDatetime>::Id: { - ui32 size; - buf.ReadMany((char*)&size, sizeof(size)); - CHECK_STRING_LENGTH_UNSIGNED(size); - auto& vec = buf.YsonBuffer(); - vec.resize(size); - buf.ReadMany(vec.data(), size); - ui32 value; - ui16 tzId; - YQL_ENSURE(DeserializeTzDatetime(TStringBuf(vec.begin(), vec.end()), value, tzId)); - auto data = NUdf::TUnboxedValuePod(value); - data.SetTimezoneId(tzId); - return data; - } - - case NUdf::TDataType<NUdf::TTzTimestamp>::Id: { - ui32 size; - buf.ReadMany((char*)&size, sizeof(size)); - CHECK_STRING_LENGTH_UNSIGNED(size); - auto& vec = buf.YsonBuffer(); - vec.resize(size); - buf.ReadMany(vec.data(), size); - ui64 value; - ui16 tzId; - YQL_ENSURE(DeserializeTzTimestamp(TStringBuf(vec.begin(), vec.end()), value, tzId)); - auto data = NUdf::TUnboxedValuePod(value); - data.SetTimezoneId(tzId); - return data; - } - - case NUdf::TDataType<NUdf::TJsonDocument>::Id: { - ui32 size; - buf.ReadMany((char*)&size, sizeof(size)); - CHECK_STRING_LENGTH_UNSIGNED(size); - auto json = NUdf::TUnboxedValue(MakeStringNotFilled(size)); - buf.ReadMany(json.AsStringRef().Data(), size); - return ValueFromString(EDataSlot::JsonDocument, json.AsStringRef()); - } - - default: - YQL_ENSURE(false, "Unsupported data type: " << schemeType); - } -} - -void SkipSkiffField(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, TInputBuf& buf) { - const bool isOptional = type->IsOptional(); - if (isOptional) { - // Unwrap optional - type = static_cast<TOptionalType*>(type)->GetItemType(); - } - - if (isOptional) { - auto marker = buf.Read(); - if (!marker) { - return; - } - } - - if (type->IsData()) { - auto schemeType = static_cast<TDataType*>(type)->GetSchemeType(); - switch (schemeType) { - case NUdf::TDataType<bool>::Id: - buf.SkipMany(sizeof(ui8)); - break; - - case NUdf::TDataType<ui8>::Id: - case NUdf::TDataType<ui16>::Id: - case NUdf::TDataType<ui32>::Id: - case NUdf::TDataType<ui64>::Id: - case NUdf::TDataType<NUdf::TDate>::Id: - case NUdf::TDataType<NUdf::TDatetime>::Id: - case NUdf::TDataType<NUdf::TTimestamp>::Id: - buf.SkipMany(sizeof(ui64)); - break; - - case NUdf::TDataType<i8>::Id: - case NUdf::TDataType<i16>::Id: - case NUdf::TDataType<i32>::Id: - case NUdf::TDataType<i64>::Id: - case NUdf::TDataType<NUdf::TInterval>::Id: - case NUdf::TDataType<NUdf::TDate32>::Id: - case NUdf::TDataType<NUdf::TDatetime64>::Id: - case NUdf::TDataType<NUdf::TTimestamp64>::Id: - case NUdf::TDataType<NUdf::TInterval64>::Id: - buf.SkipMany(sizeof(i64)); - break; - - case NUdf::TDataType<float>::Id: - case NUdf::TDataType<double>::Id: - buf.SkipMany(sizeof(double)); - break; - - case NUdf::TDataType<NUdf::TUtf8>::Id: - case NUdf::TDataType<char*>::Id: - case NUdf::TDataType<NUdf::TJson>::Id: - case NUdf::TDataType<NUdf::TYson>::Id: - case NUdf::TDataType<NUdf::TUuid>::Id: - case NUdf::TDataType<NUdf::TDyNumber>::Id: - case NUdf::TDataType<NUdf::TTzDate>::Id: - case NUdf::TDataType<NUdf::TTzDatetime>::Id: - case NUdf::TDataType<NUdf::TTzTimestamp>::Id: - case NUdf::TDataType<NUdf::TJsonDocument>::Id: { - ui32 size; - buf.ReadMany((char*)&size, sizeof(size)); - CHECK_STRING_LENGTH_UNSIGNED(size); - buf.SkipMany(size); - break; - } - case NUdf::TDataType<NUdf::TDecimal>::Id: { - if (nativeYtTypeFlags & NTCF_DECIMAL) { - auto const params = static_cast<TDataDecimalType*>(type)->GetParams(); - if (params.first < 10) { - buf.SkipMany(sizeof(i32)); - } else if (params.first < 19) { - buf.SkipMany(sizeof(i64)); - } else { - buf.SkipMany(sizeof(NDecimal::TInt128)); - } - } else { - ui32 size; - buf.ReadMany((char*)&size, sizeof(size)); - CHECK_STRING_LENGTH_UNSIGNED(size); - buf.SkipMany(size); - } - break; - } - default: - YQL_ENSURE(false, "Unsupported data type: " << schemeType); - } - return; - } - - if (type->IsPg()) { - SkipSkiffPg(static_cast<TPgType*>(type), buf); - return; - } - - if (type->IsStruct()) { - auto structType = static_cast<TStructType*>(type); - const std::vector<size_t>* reorder = nullptr; - if (auto cookie = structType->GetCookie()) { - reorder = ((const std::vector<size_t>*)cookie); - } - for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { - SkipSkiffField(structType->GetMemberType(reorder ? reorder->at(i) : i), nativeYtTypeFlags, buf); - } - return; - } - - if (type->IsList()) { - auto itemType = static_cast<TListType*>(type)->GetItemType(); - while (buf.Read() == '\0') { - SkipSkiffField(itemType, nativeYtTypeFlags, buf); - } - return; - } - - if (type->IsTuple()) { - auto tupleType = static_cast<TTupleType*>(type); - - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - SkipSkiffField(tupleType->GetElementType(i), nativeYtTypeFlags, buf); - } - return; - } - - if (type->IsVariant()) { - auto varType = AS_TYPE(TVariantType, type); - ui16 data = 0; - if (varType->GetAlternativesCount() < 256) { - buf.ReadMany((char*)&data, 1); - } else { - buf.ReadMany((char*)&data, sizeof(data)); - } - - if (varType->GetUnderlyingType()->IsTuple()) { - auto tupleType = AS_TYPE(TTupleType, varType->GetUnderlyingType()); - YQL_ENSURE(data < tupleType->GetElementsCount()); - SkipSkiffField(tupleType->GetElementType(data), nativeYtTypeFlags, buf); - } else { - auto structType = AS_TYPE(TStructType, varType->GetUnderlyingType()); - if (auto cookie = structType->GetCookie()) { - const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie); - data = reorder[data]; - } - YQL_ENSURE(data < structType->GetMembersCount()); - - SkipSkiffField(structType->GetMemberType(data), nativeYtTypeFlags, buf); - } - return; - } - - if (type->IsVoid()) { - return; - } - - if (type->IsNull()) { - return; - } - - if (type->IsEmptyList() || type->IsEmptyDict()) { - return; - } - - if (type->IsDict()) { - auto dictType = AS_TYPE(TDictType, type); - auto keyType = dictType->GetKeyType(); - auto payloadType = dictType->GetPayloadType(); - while (buf.Read() == '\0') { - SkipSkiffField(keyType, nativeYtTypeFlags, buf); - SkipSkiffField(payloadType, nativeYtTypeFlags, buf); - } - return; - } - - YQL_ENSURE(false, "Unsupported type for skip: " << type->GetKindAsStr()); -} - -NKikimr::NUdf::TUnboxedValue ReadSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, - const NKikimr::NMiniKQL::THolderFactory& holderFactory, TInputBuf& buf) -{ - if (type->IsData()) { - return ReadSkiffData(type, nativeYtTypeFlags, buf); - } - - if (type->IsPg()) { - return ReadSkiffPg(static_cast<TPgType*>(type), buf); - } - - if (type->IsOptional()) { - auto marker = buf.Read(); - if (!marker) { - return NUdf::TUnboxedValue(); - } - - auto value = ReadSkiffNativeYtValue(AS_TYPE(TOptionalType, type)->GetItemType(), nativeYtTypeFlags, holderFactory, buf); - return value.Release().MakeOptional(); - } - - if (type->IsTuple()) { - auto tupleType = AS_TYPE(TTupleType, type); - NUdf::TUnboxedValue* items; - auto value = holderFactory.CreateDirectArrayHolder(tupleType->GetElementsCount(), items); - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - items[i] = ReadSkiffNativeYtValue(tupleType->GetElementType(i), nativeYtTypeFlags, holderFactory, buf); - } - - return value; - } - - if (type->IsStruct()) { - auto structType = AS_TYPE(TStructType, type); - NUdf::TUnboxedValue* items; - auto value = holderFactory.CreateDirectArrayHolder(structType->GetMembersCount(), items); - - if (auto cookie = type->GetCookie()) { - const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie); - for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { - const auto ndx = reorder[i]; - items[ndx] = ReadSkiffNativeYtValue(structType->GetMemberType(ndx), nativeYtTypeFlags, holderFactory, buf); - } - } else { - for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { - items[i] = ReadSkiffNativeYtValue(structType->GetMemberType(i), nativeYtTypeFlags, holderFactory, buf); - } - } - - return value; - } - - if (type->IsList()) { - auto itemType = AS_TYPE(TListType, type)->GetItemType(); - TDefaultListRepresentation items; - while (buf.Read() == '\0') { - items = items.Append(ReadSkiffNativeYtValue(itemType, nativeYtTypeFlags, holderFactory, buf)); - } - - return holderFactory.CreateDirectListHolder(std::move(items)); - } - - if (type->IsVariant()) { - auto varType = AS_TYPE(TVariantType, type); - ui16 data = 0; - if (varType->GetAlternativesCount() < 256) { - buf.ReadMany((char*)&data, 1); - } else { - buf.ReadMany((char*)&data, sizeof(data)); - } - if (varType->GetUnderlyingType()->IsTuple()) { - auto tupleType = AS_TYPE(TTupleType, varType->GetUnderlyingType()); - YQL_ENSURE(data < tupleType->GetElementsCount()); - auto item = ReadSkiffNativeYtValue(tupleType->GetElementType(data), nativeYtTypeFlags, holderFactory, buf); - return holderFactory.CreateVariantHolder(item.Release(), data); - } - else { - auto structType = AS_TYPE(TStructType, varType->GetUnderlyingType()); - if (auto cookie = structType->GetCookie()) { - const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie); - data = reorder[data]; - } - YQL_ENSURE(data < structType->GetMembersCount()); - - auto item = ReadSkiffNativeYtValue(structType->GetMemberType(data), nativeYtTypeFlags, holderFactory, buf); - return holderFactory.CreateVariantHolder(item.Release(), data); - } - } - - if (type->IsVoid()) { - return NUdf::TUnboxedValue::Zero(); - } - - if (type->IsNull()) { - return NUdf::TUnboxedValue(); - } - - if (type->IsEmptyList() || type->IsEmptyDict()) { - return holderFactory.GetEmptyContainerLazy(); - } - - if (type->IsDict()) { - auto dictType = AS_TYPE(TDictType, type); - auto keyType = dictType->GetKeyType(); - auto payloadType = dictType->GetPayloadType(); - - auto builder = holderFactory.NewDict(dictType, NUdf::TDictFlags::EDictKind::Hashed); - while (buf.Read() == '\0') { - auto key = ReadSkiffNativeYtValue(keyType, nativeYtTypeFlags, holderFactory, buf); - auto payload = ReadSkiffNativeYtValue(payloadType, nativeYtTypeFlags, holderFactory, buf); - builder->Add(std::move(key), std::move(payload)); - } - - return builder->Build(); - } - - YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr()); -} - -extern "C" void ReadContainerNativeYtValue(TType* type, ui64 nativeYtTypeFlags, const NKikimr::NMiniKQL::THolderFactory& holderFactory, - NUdf::TUnboxedValue& value, TInputBuf& buf, bool wrapOptional) { - auto tmp = ReadSkiffNativeYtValue(type, nativeYtTypeFlags, holderFactory, buf); - if (!wrapOptional) { - value = std::move(tmp); - } else { - value = tmp.Release().MakeOptional(); - } -} - -/////////////////////////////////////////// -// -// Initial state first = last = &dummy -// -// +1 block first = &dummy, last = newPage, first.next = newPage, newPage.next= &dummy -// +1 block first = &dummy, last = newPage2, first.next = newPage, newPage.next = newPage2, newPage2.next = &dummy -// -/////////////////////////////////////////// -class TTempBlockWriter : public NCommon::IBlockWriter { -public: - TTempBlockWriter() - : Pool_(*TlsAllocState) - , Last_(&Dummy_) - { - Dummy_.Avail_ = 0; - Dummy_.Next_ = &Dummy_; - } - - ~TTempBlockWriter() { - auto current = Dummy_.Next_; // skip dummy node - while (current != &Dummy_) { - auto next = current->Next_; - Pool_.ReturnPage(current); - current = next; - } - } - - void SetRecordBoundaryCallback(std::function<void()> callback) override { - Y_UNUSED(callback); - } - - void WriteBlocks(TOutputBuf& buf) const { - auto current = Dummy_.Next_; // skip dummy node - while (current != &Dummy_) { - auto next = current->Next_; - buf.WriteMany((const char*)(current + 1), current->Avail_); - current = next; - } - } - - TTempBlockWriter(const TTempBlockWriter&) = delete; - void operator=(const TTempBlockWriter&) = delete; - - std::pair<char*, char*> NextEmptyBlock() override { - auto newPage = Pool_.GetPage(); - auto header = (TPageHeader*)newPage; - header->Avail_ = 0; - header->Next_ = &Dummy_; - Last_->Next_ = header; - Last_ = header; - return std::make_pair((char*)(header + 1), (char*)newPage + TAlignedPagePool::POOL_PAGE_SIZE); - } - - void ReturnBlock(size_t avail, std::optional<size_t> lastRecordBoundary) override { - Y_UNUSED(lastRecordBoundary); - YQL_ENSURE(avail <= TAlignedPagePool::POOL_PAGE_SIZE - sizeof(TPageHeader)); - Last_->Avail_ = avail; - } - - void Finish() override { - } - -private: - struct TPageHeader { - TPageHeader* Next_ = nullptr; - ui32 Avail_ = 0; - }; - - NKikimr::TAlignedPagePool& Pool_; - TPageHeader* Last_; - TPageHeader Dummy_; -}; - -void WriteYsonValueInTableFormat(TOutputBuf& buf, TType* type, ui64 nativeYtTypeFlags, const NUdf::TUnboxedValuePod& value, bool topLevel) { - // Table format, very compact - switch (type->GetKind()) { - case TType::EKind::Variant: { - buf.Write(BeginListSymbol); - auto varType = static_cast<TVariantType*>(type); - auto underlyingType = varType->GetUnderlyingType(); - auto index = value.GetVariantIndex(); - YQL_ENSURE(index < varType->GetAlternativesCount(), "Bad variant alternative: " << index << ", only " << varType->GetAlternativesCount() << " are available"); - YQL_ENSURE(underlyingType->IsTuple() || underlyingType->IsStruct(), "Wrong underlying type"); - TType* itemType; - if (underlyingType->IsTuple()) { - itemType = static_cast<TTupleType*>(underlyingType)->GetElementType(index); - } - else { - itemType = static_cast<TStructType*>(underlyingType)->GetMemberType(index); - } - if (!(nativeYtTypeFlags & NTCF_COMPLEX) || underlyingType->IsTuple()) { - buf.Write(Uint64Marker); - buf.WriteVarUI64(index); - } else { - auto structType = static_cast<TStructType*>(underlyingType); - auto varName = structType->GetMemberName(index); - buf.Write(StringMarker); - buf.WriteVarI32(varName.size()); - buf.WriteMany(varName); - } - buf.Write(ListItemSeparatorSymbol); - WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, value.GetVariantItem(), false); - buf.Write(ListItemSeparatorSymbol); - buf.Write(EndListSymbol); - break; - } - - case TType::EKind::Data: { - auto schemeType = static_cast<TDataType*>(type)->GetSchemeType(); - switch (schemeType) { - case NUdf::TDataType<bool>::Id: { - buf.Write(value.Get<bool>() ? TrueMarker : FalseMarker); - break; - } - - case NUdf::TDataType<ui8>::Id: - buf.Write(Uint64Marker); - buf.WriteVarUI64(value.Get<ui8>()); - break; - - case NUdf::TDataType<i8>::Id: - buf.Write(Int64Marker); - buf.WriteVarI64(value.Get<i8>()); - break; - - case NUdf::TDataType<ui16>::Id: - buf.Write(Uint64Marker); - buf.WriteVarUI64(value.Get<ui16>()); - break; - - case NUdf::TDataType<i16>::Id: - buf.Write(Int64Marker); - buf.WriteVarI64(value.Get<i16>()); - break; - - case NUdf::TDataType<i32>::Id: - buf.Write(Int64Marker); - buf.WriteVarI64(value.Get<i32>()); - break; - - case NUdf::TDataType<ui32>::Id: - buf.Write(Uint64Marker); - buf.WriteVarUI64(value.Get<ui32>()); - break; - - case NUdf::TDataType<i64>::Id: - buf.Write(Int64Marker); - buf.WriteVarI64(value.Get<i64>()); - break; - - case NUdf::TDataType<ui64>::Id: - buf.Write(Uint64Marker); - buf.WriteVarUI64(value.Get<ui64>()); - break; - - case NUdf::TDataType<float>::Id: { - buf.Write(DoubleMarker); - double val = value.Get<float>(); - buf.WriteMany((const char*)&val, sizeof(val)); - break; - } - - case NUdf::TDataType<double>::Id: { - buf.Write(DoubleMarker); - double val = value.Get<double>(); - buf.WriteMany((const char*)&val, sizeof(val)); - break; - } - - case NUdf::TDataType<NUdf::TUtf8>::Id: - case NUdf::TDataType<char*>::Id: - case NUdf::TDataType<NUdf::TJson>::Id: - case NUdf::TDataType<NUdf::TDyNumber>::Id: - case NUdf::TDataType<NUdf::TUuid>::Id: { - buf.Write(StringMarker); - auto str = value.AsStringRef(); - buf.WriteVarI32(str.Size()); - buf.WriteMany(str); - break; - } - - case NUdf::TDataType<NUdf::TDecimal>::Id: { - buf.Write(StringMarker); - if (nativeYtTypeFlags & NTCF_DECIMAL){ - auto const params = static_cast<TDataDecimalType*>(type)->GetParams(); - const NDecimal::TInt128 data128 = value.GetInt128(); - char tmpBuf[NYT::NDecimal::TDecimal::MaxBinarySize]; - if (params.first < 10) { - // The YQL format differs from the YT format in the inf/nan values. NDecimal::FromYtDecimal converts nan/inf - TStringBuf resBuf = NYT::NDecimal::TDecimal::WriteBinary32(params.first, NDecimal::ToYtDecimal<i32>(data128), tmpBuf, NYT::NDecimal::TDecimal::MaxBinarySize); - buf.WriteVarI32(resBuf.size()); - buf.WriteMany(resBuf.data(), resBuf.size()); - } else if (params.first < 19) { - TStringBuf resBuf = NYT::NDecimal::TDecimal::WriteBinary64(params.first, NDecimal::ToYtDecimal<i64>(data128), tmpBuf, NYT::NDecimal::TDecimal::MaxBinarySize); - buf.WriteVarI32(resBuf.size()); - buf.WriteMany(resBuf.data(), resBuf.size()); - } else { - YQL_ENSURE(params.first < 36); - NYT::NDecimal::TDecimal::TValue128 val; - auto data128Converted = NDecimal::ToYtDecimal<NDecimal::TInt128>(data128); - memcpy(&val, &data128Converted, sizeof(val)); - auto resBuf = NYT::NDecimal::TDecimal::WriteBinary128(params.first, val, tmpBuf, NYT::NDecimal::TDecimal::MaxBinarySize); - buf.WriteVarI32(resBuf.size()); - buf.WriteMany(resBuf.data(), resBuf.size()); - } - } else { - char data[sizeof(NDecimal::TInt128)]; - const ui32 size = NDecimal::Serialize(value.GetInt128(), data); - buf.WriteVarI32(size); - buf.WriteMany(data, size); - } - break; - } - - case NUdf::TDataType<NUdf::TYson>::Id: { - // embed content - buf.WriteMany(value.AsStringRef()); - break; - } - - case NUdf::TDataType<NUdf::TDate>::Id: - buf.Write(Uint64Marker); - buf.WriteVarUI64(value.Get<ui16>()); - break; - - case NUdf::TDataType<NUdf::TDatetime>::Id: - buf.Write(Uint64Marker); - buf.WriteVarUI64(value.Get<ui32>()); - break; - - case NUdf::TDataType<NUdf::TTimestamp>::Id: - buf.Write(Uint64Marker); - buf.WriteVarUI64(value.Get<ui64>()); - break; - - case NUdf::TDataType<NUdf::TInterval>::Id: - case NUdf::TDataType<NUdf::TInterval64>::Id: - case NUdf::TDataType<NUdf::TDatetime64>::Id: - case NUdf::TDataType<NUdf::TTimestamp64>::Id: - buf.Write(Int64Marker); - buf.WriteVarI64(value.Get<i64>()); - break; - - case NUdf::TDataType<NUdf::TDate32>::Id: - buf.Write(Int64Marker); - buf.WriteVarI64(value.Get<i32>()); - break; - - case NUdf::TDataType<NUdf::TTzDate>::Id: { - ui16 tzId = SwapBytes(value.GetTimezoneId()); - ui16 data = SwapBytes(value.Get<ui16>()); - ui32 size = sizeof(data) + sizeof(tzId); - buf.Write(StringMarker); - buf.WriteVarI32(size); - buf.WriteMany((const char*)&data, sizeof(data)); - buf.WriteMany((const char*)&tzId, sizeof(tzId)); - break; - } - - case NUdf::TDataType<NUdf::TTzDatetime>::Id: { - ui16 tzId = SwapBytes(value.GetTimezoneId()); - ui32 data = SwapBytes(value.Get<ui32>()); - ui32 size = sizeof(data) + sizeof(tzId); - buf.Write(StringMarker); - buf.WriteVarI32(size); - buf.WriteMany((const char*)&data, sizeof(data)); - buf.WriteMany((const char*)&tzId, sizeof(tzId)); - break; - } - - case NUdf::TDataType<NUdf::TTzTimestamp>::Id: { - ui16 tzId = SwapBytes(value.GetTimezoneId()); - ui64 data = SwapBytes(value.Get<ui64>()); - ui32 size = sizeof(data) + sizeof(tzId); - buf.Write(StringMarker); - buf.WriteVarI32(size); - buf.WriteMany((const char*)&data, sizeof(data)); - buf.WriteMany((const char*)&tzId, sizeof(tzId)); - break; - } - - case NUdf::TDataType<NUdf::TTzDate32>::Id: { - ui16 tzId = SwapBytes(value.GetTimezoneId()); - ui32 data = 0x80 ^ SwapBytes((ui32)value.Get<i32>()); - ui32 size = sizeof(data) + sizeof(tzId); - buf.Write(StringMarker); - buf.WriteVarI32(size); - buf.WriteMany((const char*)&data, sizeof(data)); - buf.WriteMany((const char*)&tzId, sizeof(tzId)); - break; - } - - case NUdf::TDataType<NUdf::TTzDatetime64>::Id: { - ui16 tzId = SwapBytes(value.GetTimezoneId()); - ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>()); - ui32 size = sizeof(data) + sizeof(tzId); - buf.Write(StringMarker); - buf.WriteVarI32(size); - buf.WriteMany((const char*)&data, sizeof(data)); - buf.WriteMany((const char*)&tzId, sizeof(tzId)); - break; - } - - case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: { - ui16 tzId = SwapBytes(value.GetTimezoneId()); - ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>()); - ui32 size = sizeof(data) + sizeof(tzId); - buf.Write(StringMarker); - buf.WriteVarI32(size); - buf.WriteMany((const char*)&data, sizeof(data)); - buf.WriteMany((const char*)&tzId, sizeof(tzId)); - break; - } - - case NUdf::TDataType<NUdf::TJsonDocument>::Id: { - buf.Write(StringMarker); - NUdf::TUnboxedValue json = ValueToString(EDataSlot::JsonDocument, value); - auto str = json.AsStringRef(); - buf.WriteVarI32(str.Size()); - buf.WriteMany(str); - break; - } - - default: - YQL_ENSURE(false, "Unsupported data type: " << schemeType); - } - - break; - } - - case TType::EKind::Struct: { - auto structType = static_cast<TStructType*>(type); - if (nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) { - buf.Write(BeginMapSymbol); - for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { - buf.Write(StringMarker); - auto key = structType->GetMemberName(i); - buf.WriteVarI32(key.size()); - buf.WriteMany(key); - buf.Write(KeyValueSeparatorSymbol); - WriteYsonValueInTableFormat(buf, structType->GetMemberType(i), nativeYtTypeFlags, value.GetElement(i), false); - buf.Write(KeyedItemSeparatorSymbol); - } - buf.Write(EndMapSymbol); - } else { - buf.Write(BeginListSymbol); - for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { - WriteYsonValueInTableFormat(buf, structType->GetMemberType(i), nativeYtTypeFlags, value.GetElement(i), false); - buf.Write(ListItemSeparatorSymbol); - } - buf.Write(EndListSymbol); - } - break; - } - - case TType::EKind::List: { - auto itemType = static_cast<TListType*>(type)->GetItemType(); - const auto iter = value.GetListIterator(); - buf.Write(BeginListSymbol); - for (NUdf::TUnboxedValue item; iter.Next(item); buf.Write(ListItemSeparatorSymbol)) { - WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, item, false); - } - - buf.Write(EndListSymbol); - break; - } - - case TType::EKind::Optional: { - auto itemType = static_cast<TOptionalType*>(type)->GetItemType(); - if (nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) { - if (value) { - if (itemType->GetKind() == TType::EKind::Optional || itemType->GetKind() == TType::EKind::Pg) { - buf.Write(BeginListSymbol); - } - WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, value.GetOptionalValue(), false); - if (itemType->GetKind() == TType::EKind::Optional || itemType->GetKind() == TType::EKind::Pg) { - buf.Write(ListItemSeparatorSymbol); - buf.Write(EndListSymbol); - } - } else { - buf.Write(EntitySymbol); - } - } else { - if (!value) { - if (topLevel) { - buf.Write(BeginListSymbol); - buf.Write(EndListSymbol); - } - else { - buf.Write(EntitySymbol); - } - } - else { - buf.Write(BeginListSymbol); - WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, value.GetOptionalValue(), false); - buf.Write(ListItemSeparatorSymbol); - buf.Write(EndListSymbol); - } - } - break; - } - - case TType::EKind::Dict: { - auto dictType = static_cast<TDictType*>(type); - const auto iter = value.GetDictIterator(); - buf.Write(BeginListSymbol); - for (NUdf::TUnboxedValue key, payload; iter.NextPair(key, payload);) { - buf.Write(BeginListSymbol); - WriteYsonValueInTableFormat(buf, dictType->GetKeyType(), nativeYtTypeFlags, key, false); - buf.Write(ListItemSeparatorSymbol); - WriteYsonValueInTableFormat(buf, dictType->GetPayloadType(), nativeYtTypeFlags, payload, false); - buf.Write(ListItemSeparatorSymbol); - buf.Write(EndListSymbol); - buf.Write(ListItemSeparatorSymbol); - } - - buf.Write(EndListSymbol); - break; - } - - case TType::EKind::Tuple: { - auto tupleType = static_cast<TTupleType*>(type); - buf.Write(BeginListSymbol); - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - WriteYsonValueInTableFormat(buf, tupleType->GetElementType(i), nativeYtTypeFlags, value.GetElement(i), false); - buf.Write(ListItemSeparatorSymbol); - } - - buf.Write(EndListSymbol); - break; - } - - case TType::EKind::Void: { - buf.Write(EntitySymbol); - break; - } - - case TType::EKind::Null: { - buf.Write(EntitySymbol); - break; - } - - case TType::EKind::EmptyList: { - buf.Write(BeginListSymbol); - buf.Write(EndListSymbol); - break; - } - - case TType::EKind::EmptyDict: { - buf.Write(BeginListSymbol); - buf.Write(EndListSymbol); - break; - } - - case TType::EKind::Pg: { - auto pgType = static_cast<TPgType*>(type); - WriteYsonValueInTableFormatPg(buf, pgType, value, topLevel); - break; - } - - default: - YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr()); - } -} - -extern "C" void WriteYsonContainerValue(TType* type, ui64 nativeYtTypeFlags, const NUdf::TUnboxedValuePod& value, TOutputBuf& buf) { - TTempBlockWriter blockWriter; - TOutputBuf ysonBuf(blockWriter, nullptr); - WriteYsonValueInTableFormat(ysonBuf, type, nativeYtTypeFlags, value, true); - ysonBuf.Flush(); - ui32 size = ysonBuf.GetWrittenBytes(); - buf.WriteMany((const char*)&size, sizeof(size)); - blockWriter.WriteBlocks(buf); -} - -extern "C" void WriteContainerNativeYtValue(TType* type, ui64 nativeYtTypeFlags, const NUdf::TUnboxedValuePod& value, TOutputBuf& buf) { - WriteSkiffNativeYtValue(type, nativeYtTypeFlags, value, buf); -} - -void WriteSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) { - auto schemeType = static_cast<TDataType*>(type)->GetSchemeType(); - switch (schemeType) { - case NUdf::TDataType<bool>::Id: { - ui8 data = value.Get<ui8>(); - buf.Write(data); - break; - } - - case NUdf::TDataType<ui8>::Id: { - ui64 data = value.Get<ui8>(); - buf.WriteMany((const char*)&data, sizeof(data)); - break; - } - - case NUdf::TDataType<i8>::Id: { - i64 data = value.Get<i8>(); - buf.WriteMany((const char*)&data, sizeof(data)); - break; - } - - case NUdf::TDataType<NUdf::TDate>::Id: - case NUdf::TDataType<ui16>::Id: { - ui64 data = value.Get<ui16>(); - buf.WriteMany((const char*)&data, sizeof(data)); - break; - } - - case NUdf::TDataType<i16>::Id: { - i64 data = value.Get<i16>(); - buf.WriteMany((const char*)&data, sizeof(data)); - break; - } - - case NUdf::TDataType<NUdf::TDate32>::Id: - case NUdf::TDataType<i32>::Id: { - i64 data = value.Get<i32>(); - buf.WriteMany((const char*)&data, sizeof(data)); - break; - } - - case NUdf::TDataType<NUdf::TDatetime>::Id: - case NUdf::TDataType<ui32>::Id: { - ui64 data = value.Get<ui32>(); - buf.WriteMany((const char*)&data, sizeof(data)); - break; - } - - case NUdf::TDataType<NUdf::TInterval>::Id: - case NUdf::TDataType<NUdf::TInterval64>::Id: - case NUdf::TDataType<NUdf::TDatetime64>::Id: - case NUdf::TDataType<NUdf::TTimestamp64>::Id: - case NUdf::TDataType<i64>::Id: { - i64 data = value.Get<i64>(); - buf.WriteMany((const char*)&data, sizeof(data)); - break; - } - - case NUdf::TDataType<NUdf::TTimestamp>::Id: - case NUdf::TDataType<ui64>::Id: { - ui64 data = value.Get<ui64>(); - buf.WriteMany((const char*)&data, sizeof(data)); - break; - } - - case NUdf::TDataType<float>::Id: { - double data = value.Get<float>(); - buf.WriteMany((const char*)&data, sizeof(data)); - break; - } - - case NUdf::TDataType<double>::Id: { - double data = value.Get<double>(); - buf.WriteMany((const char*)&data, sizeof(data)); - break; - } - - case NUdf::TDataType<NUdf::TUtf8>::Id: - case NUdf::TDataType<char*>::Id: - case NUdf::TDataType<NUdf::TJson>::Id: - case NUdf::TDataType<NUdf::TYson>::Id: - case NUdf::TDataType<NUdf::TDyNumber>::Id: - case NUdf::TDataType<NUdf::TUuid>::Id: { - auto str = value.AsStringRef(); - ui32 size = str.Size(); - buf.WriteMany((const char*)&size, sizeof(size)); - buf.WriteMany(str); - break; - } - - case NUdf::TDataType<NUdf::TDecimal>::Id: { - if (nativeYtTypeFlags & NTCF_DECIMAL) { - auto const params = static_cast<TDataDecimalType*>(type)->GetParams(); - const NDecimal::TInt128 data128 = value.GetInt128(); - if (params.first < 10) { - auto data = NDecimal::ToYtDecimal<i32>(data128); - buf.WriteMany((const char*)&data, sizeof(data)); - } else if (params.first < 19) { - auto data = NDecimal::ToYtDecimal<i64>(data128); - buf.WriteMany((const char*)&data, sizeof(data)); - } else { - YQL_ENSURE(params.first < 36); - auto data = NDecimal::ToYtDecimal<NDecimal::TInt128>(data128); - buf.WriteMany((const char*)&data, sizeof(data)); - } - } else { - char data[sizeof(NDecimal::TInt128)]; - const ui32 size = NDecimal::Serialize(value.GetInt128(), data); - buf.WriteMany(reinterpret_cast<const char*>(&size), sizeof(size)); - buf.WriteMany(data, size); - } - break; - } - - case NUdf::TDataType<NUdf::TTzDate>::Id: { - ui16 tzId = SwapBytes(value.GetTimezoneId()); - ui16 data = SwapBytes(value.Get<ui16>()); - ui32 size = sizeof(data) + sizeof(tzId); - buf.WriteMany((const char*)&size, sizeof(size)); - buf.WriteMany((const char*)&data, sizeof(data)); - buf.WriteMany((const char*)&tzId, sizeof(tzId)); - break; - } - - case NUdf::TDataType<NUdf::TTzDatetime>::Id: { - ui16 tzId = SwapBytes(value.GetTimezoneId()); - ui32 data = SwapBytes(value.Get<ui32>()); - ui32 size = sizeof(data) + sizeof(tzId); - buf.WriteMany((const char*)&size, sizeof(size)); - buf.WriteMany((const char*)&data, sizeof(data)); - buf.WriteMany((const char*)&tzId, sizeof(tzId)); - break; - } - - case NUdf::TDataType<NUdf::TTzTimestamp>::Id: { - ui16 tzId = SwapBytes(value.GetTimezoneId()); - ui64 data = SwapBytes(value.Get<ui64>()); - ui32 size = sizeof(data) + sizeof(tzId); - buf.WriteMany((const char*)&size, sizeof(size)); - buf.WriteMany((const char*)&data, sizeof(data)); - buf.WriteMany((const char*)&tzId, sizeof(tzId)); - break; - } - - case NUdf::TDataType<NUdf::TTzDate32>::Id: { - ui16 tzId = SwapBytes(value.GetTimezoneId()); - ui32 data = 0x80 ^ SwapBytes((ui32)value.Get<i32>()); - ui32 size = sizeof(data) + sizeof(tzId); - buf.WriteMany((const char*)&size, sizeof(size)); - buf.WriteMany((const char*)&data, sizeof(data)); - buf.WriteMany((const char*)&tzId, sizeof(tzId)); - break; - } - - case NUdf::TDataType<NUdf::TTzDatetime64>::Id: { - ui16 tzId = SwapBytes(value.GetTimezoneId()); - ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>()); - ui32 size = sizeof(data) + sizeof(tzId); - buf.WriteMany((const char*)&size, sizeof(size)); - buf.WriteMany((const char*)&data, sizeof(data)); - buf.WriteMany((const char*)&tzId, sizeof(tzId)); - break; - } - - case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: { - ui16 tzId = SwapBytes(value.GetTimezoneId()); - ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>()); - ui32 size = sizeof(data) + sizeof(tzId); - buf.WriteMany((const char*)&size, sizeof(size)); - buf.WriteMany((const char*)&data, sizeof(data)); - buf.WriteMany((const char*)&tzId, sizeof(tzId)); - break; - } - - case NUdf::TDataType<NUdf::TJsonDocument>::Id: { - NUdf::TUnboxedValue json = ValueToString(EDataSlot::JsonDocument, value); - auto str = json.AsStringRef(); - ui32 size = str.Size(); - buf.WriteMany((const char*)&size, sizeof(size)); - buf.WriteMany(str); - break; - } - - default: - YQL_ENSURE(false, "Unsupported data type: " << schemeType); - } -} - -void WriteSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) { - if (type->IsData()) { - WriteSkiffData(type, nativeYtTypeFlags, value, buf); - } else if (type->IsPg()) { - WriteSkiffPgValue(static_cast<TPgType*>(type), value, buf); - } else if (type->IsOptional()) { - if (!value) { - buf.Write('\0'); - return; - } - - buf.Write('\1'); - WriteSkiffNativeYtValue(AS_TYPE(TOptionalType, type)->GetItemType(), nativeYtTypeFlags, value.GetOptionalValue(), buf); - } else if (type->IsList()) { - auto itemType = AS_TYPE(TListType, type)->GetItemType(); - auto elements = value.GetElements(); - if (elements) { - ui32 size = value.GetListLength(); - for (ui32 i = 0; i < size; ++i) { - buf.Write('\0'); - WriteSkiffNativeYtValue(itemType, nativeYtTypeFlags, elements[i], buf); - } - } else { - NUdf::TUnboxedValue item; - for (auto iter = value.GetListIterator(); iter.Next(item); ) { - buf.Write('\0'); - WriteSkiffNativeYtValue(itemType, nativeYtTypeFlags, item, buf); - } - } - - buf.Write('\xff'); - } else if (type->IsTuple()) { - auto tupleType = AS_TYPE(TTupleType, type); - auto elements = value.GetElements(); - if (elements) { - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - WriteSkiffNativeYtValue(tupleType->GetElementType(i), nativeYtTypeFlags, elements[i], buf); - } - } else { - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - WriteSkiffNativeYtValue(tupleType->GetElementType(i), nativeYtTypeFlags, value.GetElement(i), buf); - } - } - } else if (type->IsStruct()) { - auto structType = AS_TYPE(TStructType, type); - auto elements = value.GetElements(); - if (auto cookie = type->GetCookie()) { - const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie); - if (elements) { - for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { - const auto ndx = reorder[i]; - WriteSkiffNativeYtValue(structType->GetMemberType(ndx), nativeYtTypeFlags, elements[ndx], buf); - } - } else { - for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { - const auto ndx = reorder[i]; - WriteSkiffNativeYtValue(structType->GetMemberType(ndx), nativeYtTypeFlags, value.GetElement(ndx), buf); - } - } - } else { - if (elements) { - for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { - WriteSkiffNativeYtValue(structType->GetMemberType(i), nativeYtTypeFlags, elements[i], buf); - } - } else { - for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { - WriteSkiffNativeYtValue(structType->GetMemberType(i), nativeYtTypeFlags, value.GetElement(i), buf); - } - } - } - } else if (type->IsVariant()) { - auto varType = AS_TYPE(TVariantType, type); - ui16 index = (ui16)value.GetVariantIndex(); - if (varType->GetAlternativesCount() < 256) { - buf.WriteMany((const char*)&index, 1); - } else { - buf.WriteMany((const char*)&index, sizeof(index)); - } - - if (varType->GetUnderlyingType()->IsTuple()) { - auto tupleType = AS_TYPE(TTupleType, varType->GetUnderlyingType()); - WriteSkiffNativeYtValue(tupleType->GetElementType(index), nativeYtTypeFlags, value.GetVariantItem(), buf); - } else { - auto structType = AS_TYPE(TStructType, varType->GetUnderlyingType()); - if (auto cookie = structType->GetCookie()) { - const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie); - index = reorder[index]; - } - YQL_ENSURE(index < structType->GetMembersCount()); - - WriteSkiffNativeYtValue(structType->GetMemberType(index), nativeYtTypeFlags, value.GetVariantItem(), buf); - } - } else if (type->IsVoid() || type->IsNull() || type->IsEmptyList() || type->IsEmptyDict()) { - } else if (type->IsDict()) { - auto dictType = AS_TYPE(TDictType, type); - auto keyType = dictType->GetKeyType(); - auto payloadType = dictType->GetPayloadType(); - NUdf::TUnboxedValue key, payload; - for (auto iter = value.GetDictIterator(); iter.NextPair(key, payload); ) { - buf.Write('\0'); - WriteSkiffNativeYtValue(keyType, nativeYtTypeFlags, key, buf); - WriteSkiffNativeYtValue(payloadType, nativeYtTypeFlags, payload, buf); - } - - buf.Write('\xff'); - } else { - YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr()); - } + return ParseYsonValue(holderFactory, NYT::NodeToYsonString(node, NYson::EYsonFormat::Binary), type, err); } TExprNode::TPtr ValueToExprLiteral(const TTypeAnnotationNode* type, const NKikimr::NUdf::TUnboxedValuePod& value, TExprContext& ctx, diff --git a/yql/essentials/providers/common/codec/yql_codec.h b/yql/essentials/providers/common/codec/yql_codec.h index f3f50e5c58..741f0b1966 100644 --- a/yql/essentials/providers/common/codec/yql_codec.h +++ b/yql/essentials/providers/common/codec/yql_codec.h @@ -63,43 +63,15 @@ struct TCodecContext { void SkipYson(char cmd, TInputBuf& buf); void CopyYson(char cmd, TInputBuf& buf, TVector<char>& yson); void CopyYsonWithAttrs(char cmd, TInputBuf& buf, TVector<char>& yson); -NKikimr::NUdf::TUnboxedValue ReadYsonValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NMiniKQL::THolderFactory& holderFactory, char cmd, TInputBuf& buf, bool isTableFormat); +TStringBuf ReadNextString(char cmd, TInputBuf& buf); +NKikimr::NUdf::TUnboxedValue ReadYsonValue(NKikimr::NMiniKQL::TType* type, const NKikimr::NMiniKQL::THolderFactory& holderFactory, char cmd, TInputBuf& buf); TMaybe<NKikimr::NUdf::TUnboxedValue> ParseYsonValue(const NKikimr::NMiniKQL::THolderFactory& holderFactory, - const TStringBuf& yson, NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, IOutputStream* err, bool isTableFormat); -TMaybe<NKikimr::NUdf::TUnboxedValue> ParseYsonNode(const NKikimr::NMiniKQL::THolderFactory& holderFactory, - const NYT::TNode& node, NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, IOutputStream* err); + const TStringBuf& yson, NKikimr::NMiniKQL::TType* type, IOutputStream* err); TMaybe<NKikimr::NUdf::TUnboxedValue> ParseYsonNodeInResultFormat(const NKikimr::NMiniKQL::THolderFactory& holderFactory, const NYT::TNode& node, NKikimr::NMiniKQL::TType* type, IOutputStream* err); -extern "C" void ReadYsonContainerValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, - const NKikimr::NMiniKQL::THolderFactory& holderFactory, NKikimr::NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf, - bool wrapOptional); - -void SkipSkiffField(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, TInputBuf& buf); - -NKikimr::NUdf::TUnboxedValue ReadSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, - const NKikimr::NMiniKQL::THolderFactory& holderFactory, TInputBuf& buf); - -NKikimr::NUdf::TUnboxedValue ReadSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, NCommon::TInputBuf& buf); -extern "C" void ReadContainerNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, - const NKikimr::NMiniKQL::THolderFactory& holderFactory, NKikimr::NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf, - bool wrapOptional); - -extern "C" void WriteYsonContainerValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, - const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf); - -void WriteSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf); - -void WriteSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, - const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf); - -extern "C" void WriteContainerNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, - const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf); - -void WriteYsonValueInTableFormat(TOutputBuf& buf, NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, bool topLevel); - TExprNode::TPtr ValueToExprLiteral(const TTypeAnnotationNode* type, const NKikimr::NUdf::TUnboxedValuePod& value, TExprContext& ctx, TPositionHandle pos = {}); diff --git a/yt/yql/providers/yt/codec/codegen/ya.make.inc b/yt/yql/providers/yt/codec/codegen/ya.make.inc index 0548d7e545..e418e84d22 100644 --- a/yt/yql/providers/yt/codec/codegen/ya.make.inc +++ b/yt/yql/providers/yt/codec/codegen/ya.make.inc @@ -15,6 +15,7 @@ PEERDIR( yql/essentials/parser/pg_wrapper/interface yql/essentials/utils yt/yql/providers/yt/codec/codegen + yt/yql/providers/yt/codec ) IF (NOT MKQL_DISABLE_CODEGEN) diff --git a/yt/yql/providers/yt/codec/codegen/yt_codec_cg.cpp b/yt/yql/providers/yt/codec/codegen/yt_codec_cg.cpp index 61c56e5531..4c7745155f 100644 --- a/yt/yql/providers/yt/codec/codegen/yt_codec_cg.cpp +++ b/yt/yql/providers/yt/codec/codegen/yt_codec_cg.cpp @@ -4,6 +4,7 @@ #include <yql/essentials/parser/pg_wrapper/interface/codec.h> #include <yql/essentials/providers/common/codec/yql_codec_buf.h> #include <yql/essentials/providers/common/codec/yql_codec_type_flags.h> +#include <yt/yql/providers/yt/codec/yt_codec.h> #ifndef MKQL_DISABLE_CODEGEN #include <yql/essentials/minikql/mkql_node.h> @@ -171,7 +172,7 @@ public: const auto valType = Type::getInt128Ty(context); const auto flagsConst = ConstantInt::get(Type::getInt64Ty(context), nativeYtTypeFlags); if (nativeYtTypeFlags) { - const auto funcAddr = ConstantInt::get(Type::getInt64Ty(context), (ui64)&NYql::NCommon::WriteContainerNativeYtValue); + const auto funcAddr = ConstantInt::get(Type::getInt64Ty(context), (ui64)&NYql::WriteContainerNativeYtValue); const auto funType = FunctionType::get(Type::getVoidTy(context), { Type::getInt64Ty(context), Type::getInt64Ty(context), PointerType::getUnqual(valType), PointerType::getUnqual(Type::getInt8Ty(context)) @@ -180,7 +181,7 @@ public: const auto funcPtr = CastInst::Create(Instruction::IntToPtr, funcAddr, PointerType::getUnqual(funType), "ptr", Block_); CallInst::Create(funType, funcPtr, { typeConst, flagsConst, elemPtr, buf }, "", Block_); } else { - const auto funcAddr = ConstantInt::get(Type::getInt64Ty(context), (ui64)&NYql::NCommon::WriteYsonContainerValue); + const auto funcAddr = ConstantInt::get(Type::getInt64Ty(context), (ui64)&NYql::WriteYsonContainerValue); const auto funType = FunctionType::get(Type::getVoidTy(context), { Type::getInt64Ty(context), Type::getInt64Ty(context), PointerType::getUnqual(valType), PointerType::getUnqual(Type::getInt8Ty(context)) @@ -792,7 +793,7 @@ private: void GenerateContainer(Value* velemPtr, Value* buf, TType* type, bool wrapOptional, ui64 nativeYtTypeFlags) { auto& context = Codegen_->GetContext(); - const auto funcAddr = ConstantInt::get(Type::getInt64Ty(context), nativeYtTypeFlags ? (ui64)&NCommon::ReadContainerNativeYtValue : (ui64)&NCommon::ReadYsonContainerValue); + const auto funcAddr = ConstantInt::get(Type::getInt64Ty(context), nativeYtTypeFlags ? (ui64)&ReadContainerNativeYtValue : (ui64)&ReadYsonContainerValue); const auto typeConst = ConstantInt::get(Type::getInt64Ty(context), (ui64)type); const auto holderFactoryConst = ConstantInt::get(Type::getInt64Ty(context), (ui64)&HolderFactory_); const auto wrapConst = ConstantInt::get(Type::getInt1Ty(context), wrapOptional); diff --git a/yt/yql/providers/yt/codec/ya.make b/yt/yql/providers/yt/codec/ya.make index 8853ac1485..ef0593121f 100644 --- a/yt/yql/providers/yt/codec/ya.make +++ b/yt/yql/providers/yt/codec/ya.make @@ -33,6 +33,7 @@ PEERDIR( yt/yql/providers/yt/common yt/yql/providers/yt/lib/mkql_helpers yt/yql/providers/yt/lib/skiff + yt/yt/library/decimal yql/essentials/providers/common/codec/yt_arrow_converter_interface ) diff --git a/yt/yql/providers/yt/codec/yt_codec.cpp b/yt/yql/providers/yt/codec/yt_codec.cpp index 86381e1920..6e64136937 100644 --- a/yt/yql/providers/yt/codec/yt_codec.cpp +++ b/yt/yql/providers/yt/codec/yt_codec.cpp @@ -10,9 +10,17 @@ #include <yql/essentials/minikql/mkql_node_builder.h> #include <yql/essentials/minikql/mkql_string_util.h> #include <yql/essentials/utils/yql_panic.h> +#include <yql/essentials/utils/swap_bytes.h> #include <yql/essentials/core/yql_type_annotation.h> +#include <yql/essentials/public/result_format/yql_codec_results.h> +#include <yql/essentials/public/decimal/yql_decimal.h> +#include <yql/essentials/public/decimal/yql_decimal_serialize.h> +#include <yql/essentials/minikql/computation/mkql_computation_node_pack.h> + +#include <yt/yt/library/decimal/decimal.h> #include <library/cpp/yson/node/node_io.h> +#include <library/cpp/yson/detail.h> #include <util/generic/hash_set.h> #include <util/generic/map.h> @@ -28,6 +36,7 @@ namespace NYql { using namespace NKikimr; using namespace NKikimr::NMiniKQL; +using namespace NYson::NDetail; ////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -675,4 +684,1913 @@ TMkqlIOCache::TMkqlIOCache(const TMkqlIOSpecs& specs, const THolderFactory& hold } } +template <typename T> +T ReadYsonFloatNumberInTableFormat(char cmd, NCommon::TInputBuf& buf) { + CHECK_EXPECTED(cmd, DoubleMarker); + double dbl; + buf.ReadMany((char*)&dbl, sizeof(dbl)); + return dbl; +} + +NUdf::TUnboxedValue ReadYsonValueInTableFormat(TType* type, ui64 nativeYtTypeFlags, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, char cmd, NCommon::TInputBuf& buf) { + switch (type->GetKind()) { + case TType::EKind::Variant: { + auto varType = static_cast<TVariantType*>(type); + auto underlyingType = varType->GetUnderlyingType(); + if (nativeYtTypeFlags & NTCF_COMPLEX) { + CHECK_EXPECTED(cmd, BeginListSymbol); + cmd = buf.Read(); + TType* type = nullptr; + i64 index = 0; + if (cmd == StringMarker) { + YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type"); + auto structType = static_cast<TStructType*>(underlyingType); + auto nameBuffer = ReadNextString(cmd, buf); + auto foundIndex = structType->FindMemberIndex(nameBuffer); + YQL_ENSURE(foundIndex.Defined(), "Unexpected member: " << nameBuffer); + index = *foundIndex; + type = varType->GetAlternativeType(index); + } else { + YQL_ENSURE(cmd == Int64Marker || cmd == Uint64Marker); + YQL_ENSURE(underlyingType->IsTuple(), "Expected tuple as underlying type"); + if (cmd == Uint64Marker) { + index = buf.ReadVarUI64(); + } else { + index = buf.ReadVarI64(); + } + YQL_ENSURE(0 <= index && index < varType->GetAlternativesCount(), "Unexpected member index: " << index); + type = varType->GetAlternativeType(index); + } + cmd = buf.Read(); + CHECK_EXPECTED(cmd, ListItemSeparatorSymbol); + cmd = buf.Read(); + auto value = ReadYsonValueInTableFormat(type, nativeYtTypeFlags, holderFactory, cmd, buf); + cmd = buf.Read(); + if (cmd != EndListSymbol) { + CHECK_EXPECTED(cmd, ListItemSeparatorSymbol); + cmd = buf.Read(); + CHECK_EXPECTED(cmd, EndListSymbol); + } + return holderFactory.CreateVariantHolder(value.Release(), index); + } else { + if (cmd == StringMarker) { + YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type"); + auto name = ReadNextString(cmd, buf); + auto index = static_cast<TStructType*>(underlyingType)->FindMemberIndex(name); + YQL_ENSURE(index, "Unexpected member: " << name); + YQL_ENSURE(static_cast<TStructType*>(underlyingType)->GetMemberType(*index)->IsVoid(), "Expected Void as underlying type"); + return holderFactory.CreateVariantHolder(NUdf::TUnboxedValuePod::Zero(), *index); + } + + CHECK_EXPECTED(cmd, BeginListSymbol); + cmd = buf.Read(); + i64 index = 0; + YQL_ENSURE(cmd == Int64Marker || cmd == Uint64Marker); + if (cmd == Uint64Marker) { + index = buf.ReadVarUI64(); + } else { + index = buf.ReadVarI64(); + } + + YQL_ENSURE(index < varType->GetAlternativesCount(), "Bad variant alternative: " << index << ", only " << + varType->GetAlternativesCount() << " are available"); + YQL_ENSURE(underlyingType->IsTuple() || underlyingType->IsStruct(), "Wrong underlying type"); + TType* itemType; + if (underlyingType->IsTuple()) { + itemType = static_cast<TTupleType*>(underlyingType)->GetElementType(index); + } + else { + itemType = static_cast<TStructType*>(underlyingType)->GetMemberType(index); + } + + EXPECTED(buf, ListItemSeparatorSymbol); + cmd = buf.Read(); + auto value = ReadYsonValueInTableFormat(itemType, nativeYtTypeFlags, holderFactory, cmd, buf); + cmd = buf.Read(); + if (cmd == ListItemSeparatorSymbol) { + cmd = buf.Read(); + } + + CHECK_EXPECTED(cmd, EndListSymbol); + return holderFactory.CreateVariantHolder(value.Release(), index); + } + } + + case TType::EKind::Data: { + auto schemeType = static_cast<TDataType*>(type)->GetSchemeType(); + switch (schemeType) { + case NUdf::TDataType<bool>::Id: + YQL_ENSURE(cmd == FalseMarker || cmd == TrueMarker, "Expected either true or false, but got: " << TString(cmd).Quote()); + return NUdf::TUnboxedValuePod(cmd == TrueMarker); + + case NUdf::TDataType<ui8>::Id: + CHECK_EXPECTED(cmd, Uint64Marker); + return NUdf::TUnboxedValuePod(ui8(buf.ReadVarUI64())); + + case NUdf::TDataType<i8>::Id: + CHECK_EXPECTED(cmd, Int64Marker); + return NUdf::TUnboxedValuePod(i8(buf.ReadVarI64())); + + case NUdf::TDataType<ui16>::Id: + CHECK_EXPECTED(cmd, Uint64Marker); + return NUdf::TUnboxedValuePod(ui16(buf.ReadVarUI64())); + + case NUdf::TDataType<i16>::Id: + CHECK_EXPECTED(cmd, Int64Marker); + return NUdf::TUnboxedValuePod(i16(buf.ReadVarI64())); + + case NUdf::TDataType<i32>::Id: + CHECK_EXPECTED(cmd, Int64Marker); + return NUdf::TUnboxedValuePod(i32(buf.ReadVarI64())); + + case NUdf::TDataType<ui32>::Id: + CHECK_EXPECTED(cmd, Uint64Marker); + return NUdf::TUnboxedValuePod(ui32(buf.ReadVarUI64())); + + case NUdf::TDataType<i64>::Id: + CHECK_EXPECTED(cmd, Int64Marker); + return NUdf::TUnboxedValuePod(buf.ReadVarI64()); + + case NUdf::TDataType<ui64>::Id: + CHECK_EXPECTED(cmd, Uint64Marker); + return NUdf::TUnboxedValuePod(buf.ReadVarUI64()); + + case NUdf::TDataType<float>::Id: + return NUdf::TUnboxedValuePod(ReadYsonFloatNumberInTableFormat<float>(cmd, buf)); + + case NUdf::TDataType<double>::Id: + return NUdf::TUnboxedValuePod(ReadYsonFloatNumberInTableFormat<double>(cmd, buf)); + + case NUdf::TDataType<NUdf::TUtf8>::Id: + case NUdf::TDataType<char*>::Id: + case NUdf::TDataType<NUdf::TJson>::Id: + case NUdf::TDataType<NUdf::TDyNumber>::Id: + case NUdf::TDataType<NUdf::TUuid>::Id: { + auto nextString = ReadNextString(cmd, buf); + return NUdf::TUnboxedValue(MakeString(NUdf::TStringRef(nextString))); + } + + case NUdf::TDataType<NUdf::TDecimal>::Id: { + auto nextString = ReadNextString(cmd, buf); + if (nativeYtTypeFlags & NTCF_DECIMAL) { + auto const params = static_cast<TDataDecimalType*>(type)->GetParams(); + if (params.first < 10) { + // The YQL format differs from the YT format in the inf/nan values. NDecimal::FromYtDecimal converts nan/inf + NDecimal::TInt128 res = NDecimal::FromYtDecimal(NYT::NDecimal::TDecimal::ParseBinary32(params.first, nextString)); + YQL_ENSURE(!NDecimal::IsError(res)); + return NUdf::TUnboxedValuePod(res); + } else if (params.first < 19) { + NDecimal::TInt128 res = NDecimal::FromYtDecimal(NYT::NDecimal::TDecimal::ParseBinary64(params.first, nextString)); + YQL_ENSURE(!NDecimal::IsError(res)); + return NUdf::TUnboxedValuePod(res); + } else { + YQL_ENSURE(params.first < 36); + NYT::NDecimal::TDecimal::TValue128 tmpRes = NYT::NDecimal::TDecimal::ParseBinary128(params.first, nextString); + NDecimal::TInt128 res; + static_assert(sizeof(NDecimal::TInt128) == sizeof(NYT::NDecimal::TDecimal::TValue128)); + memcpy(&res, &tmpRes, sizeof(NDecimal::TInt128)); + res = NDecimal::FromYtDecimal(res); + YQL_ENSURE(!NDecimal::IsError(res)); + return NUdf::TUnboxedValuePod(res); + } + } + else { + const auto& des = NDecimal::Deserialize(nextString.data(), nextString.size()); + YQL_ENSURE(!NDecimal::IsError(des.first)); + YQL_ENSURE(nextString.size() == des.second); + return NUdf::TUnboxedValuePod(des.first); + } + } + + case NUdf::TDataType<NUdf::TYson>::Id: { + auto& yson = buf.YsonBuffer(); + yson.clear(); + CopyYsonWithAttrs(cmd, buf, yson); + + return NUdf::TUnboxedValue(MakeString(NUdf::TStringRef(yson))); + } + + case NUdf::TDataType<NUdf::TDate>::Id: + CHECK_EXPECTED(cmd, Uint64Marker); + return NUdf::TUnboxedValuePod((ui16)buf.ReadVarUI64()); + + case NUdf::TDataType<NUdf::TDatetime>::Id: + CHECK_EXPECTED(cmd, Uint64Marker); + return NUdf::TUnboxedValuePod((ui32)buf.ReadVarUI64()); + + case NUdf::TDataType<NUdf::TTimestamp>::Id: + CHECK_EXPECTED(cmd, Uint64Marker); + return NUdf::TUnboxedValuePod(buf.ReadVarUI64()); + + case NUdf::TDataType<NUdf::TInterval>::Id: + CHECK_EXPECTED(cmd, Int64Marker); + return NUdf::TUnboxedValuePod(buf.ReadVarI64()); + + case NUdf::TDataType<NUdf::TTzDate>::Id: { + auto nextString = ReadNextString(cmd, buf); + NUdf::TUnboxedValuePod data; + ui16 value; + ui16 tzId = 0; + YQL_ENSURE(DeserializeTzDate(nextString, value, tzId)); + data = NUdf::TUnboxedValuePod(value); + data.SetTimezoneId(tzId); + return data; + } + + case NUdf::TDataType<NUdf::TTzDatetime>::Id: { + auto nextString = ReadNextString(cmd, buf); + NUdf::TUnboxedValuePod data; + ui32 value; + ui16 tzId = 0; + YQL_ENSURE(DeserializeTzDatetime(nextString, value, tzId)); + data = NUdf::TUnboxedValuePod(value); + data.SetTimezoneId(tzId); + return data; + } + + case NUdf::TDataType<NUdf::TTzTimestamp>::Id: { + auto nextString = ReadNextString(cmd, buf); + NUdf::TUnboxedValuePod data; + ui64 value; + ui16 tzId = 0; + YQL_ENSURE(DeserializeTzTimestamp(nextString, value, tzId)); + data = NUdf::TUnboxedValuePod(value); + data.SetTimezoneId(tzId); + return data; + } + + case NUdf::TDataType<NUdf::TDate32>::Id: + CHECK_EXPECTED(cmd, Int64Marker); + return NUdf::TUnboxedValuePod((i32)buf.ReadVarI64()); + + case NUdf::TDataType<NUdf::TDatetime64>::Id: + case NUdf::TDataType<NUdf::TTimestamp64>::Id: + case NUdf::TDataType<NUdf::TInterval64>::Id: + CHECK_EXPECTED(cmd, Int64Marker); + return NUdf::TUnboxedValuePod(buf.ReadVarI64()); + + case NUdf::TDataType<NUdf::TJsonDocument>::Id: { + return ValueFromString(EDataSlot::JsonDocument, ReadNextString(cmd, buf)); + } + + case NUdf::TDataType<NUdf::TTzDate32>::Id: { + auto nextString = ReadNextString(cmd, buf); + NUdf::TUnboxedValuePod data; + i32 value; + ui16 tzId = 0; + YQL_ENSURE(DeserializeTzDate32(nextString, value, tzId)); + data = NUdf::TUnboxedValuePod(value); + data.SetTimezoneId(tzId); + return data; + } + + case NUdf::TDataType<NUdf::TTzDatetime64>::Id: { + auto nextString = ReadNextString(cmd, buf); + NUdf::TUnboxedValuePod data; + i64 value; + ui16 tzId = 0; + YQL_ENSURE(DeserializeTzDatetime64(nextString, value, tzId)); + data = NUdf::TUnboxedValuePod(value); + data.SetTimezoneId(tzId); + return data; + } + + case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: { + auto nextString = ReadNextString(cmd, buf); + NUdf::TUnboxedValuePod data; + i64 value; + ui16 tzId = 0; + YQL_ENSURE(DeserializeTzTimestamp64(nextString, value, tzId)); + data = NUdf::TUnboxedValuePod(value); + data.SetTimezoneId(tzId); + return data; + } + + default: + YQL_ENSURE(false, "Unsupported data type: " << schemeType); + } + } + + case TType::EKind::Struct: { + YQL_ENSURE(cmd == BeginListSymbol || cmd == BeginMapSymbol); + auto structType = static_cast<TStructType*>(type); + NUdf::TUnboxedValue* items; + NUdf::TUnboxedValue ret = holderFactory.CreateDirectArrayHolder(structType->GetMembersCount(), items); + if (cmd == BeginListSymbol) { + cmd = buf.Read(); + + for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { + items[i] = ReadYsonValueInTableFormat(structType->GetMemberType(i), nativeYtTypeFlags, holderFactory, cmd, buf); + + cmd = buf.Read(); + if (cmd == ListItemSeparatorSymbol) { + cmd = buf.Read(); + } + } + + CHECK_EXPECTED(cmd, EndListSymbol); + return ret; + } else { + cmd = buf.Read(); + + for (;;) { + if (cmd == EndMapSymbol) { + break; + } + + auto keyBuffer = ReadNextString(cmd, buf); + auto pos = structType->FindMemberIndex(keyBuffer); + EXPECTED(buf, KeyValueSeparatorSymbol); + cmd = buf.Read(); + if (pos && cmd != '#') { + auto memberType = structType->GetMemberType(*pos); + auto unwrappedType = memberType; + if (!(nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) && unwrappedType->IsOptional()) { + unwrappedType = static_cast<TOptionalType*>(unwrappedType)->GetItemType(); + } + + items[*pos] = ReadYsonValueInTableFormat(unwrappedType, nativeYtTypeFlags, holderFactory, cmd, buf); + } else { + SkipYson(cmd, buf); + } + + cmd = buf.Read(); + if (cmd == KeyedItemSeparatorSymbol) { + cmd = buf.Read(); + } + } + + for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { + if (items[i]) { + continue; + } + + YQL_ENSURE(structType->GetMemberType(i)->IsOptional(), "Missing required field: " << structType->GetMemberName(i)); + } + + return ret; + } + } + + case TType::EKind::List: { + auto itemType = static_cast<TListType*>(type)->GetItemType(); + TDefaultListRepresentation items; + CHECK_EXPECTED(cmd, BeginListSymbol); + cmd = buf.Read(); + + for (;;) { + if (cmd == EndListSymbol) { + break; + } + + items = items.Append(ReadYsonValueInTableFormat(itemType, nativeYtTypeFlags, holderFactory, cmd, buf)); + + cmd = buf.Read(); + if (cmd == ListItemSeparatorSymbol) { + cmd = buf.Read(); + } + } + + return holderFactory.CreateDirectListHolder(std::move(items)); + } + + case TType::EKind::Optional: { + if (cmd == EntitySymbol) { + return NUdf::TUnboxedValuePod(); + } + auto itemType = static_cast<TOptionalType*>(type)->GetItemType(); + if (nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) { + if (itemType->GetKind() == TType::EKind::Optional || itemType->GetKind() == TType::EKind::Pg) { + CHECK_EXPECTED(cmd, BeginListSymbol); + cmd = buf.Read(); + auto value = ReadYsonValueInTableFormat(itemType, nativeYtTypeFlags, holderFactory, cmd, buf); + cmd = buf.Read(); + if (cmd == ListItemSeparatorSymbol) { + cmd = buf.Read(); + } + CHECK_EXPECTED(cmd, EndListSymbol); + return value.Release().MakeOptional(); + } else { + return ReadYsonValueInTableFormat(itemType, nativeYtTypeFlags, holderFactory, cmd, buf).Release().MakeOptional(); + } + } else { + if (cmd != BeginListSymbol) { + auto value = ReadYsonValueInTableFormat(itemType, nativeYtTypeFlags, holderFactory, cmd, buf); + return value.Release().MakeOptional(); + } + + cmd = buf.Read(); + if (cmd == EndListSymbol) { + return NUdf::TUnboxedValuePod(); + } + + auto value = ReadYsonValueInTableFormat(itemType, nativeYtTypeFlags, holderFactory, cmd, buf); + cmd = buf.Read(); + if (cmd == ListItemSeparatorSymbol) { + cmd = buf.Read(); + } + + CHECK_EXPECTED(cmd, EndListSymbol); + return value.Release().MakeOptional(); + } + } + + case TType::EKind::Dict: { + auto dictType = static_cast<TDictType*>(type); + auto keyType = dictType->GetKeyType(); + auto payloadType = dictType->GetPayloadType(); + TKeyTypes types; + bool isTuple; + bool encoded; + bool useIHash; + GetDictionaryKeyTypes(keyType, types, isTuple, encoded, useIHash); + + TMaybe<TValuePacker> packer; + if (encoded) { + packer.ConstructInPlace(true, keyType); + } + + YQL_ENSURE(cmd == BeginListSymbol || cmd == BeginMapSymbol, "Expected '{' or '[', but read: " << TString(cmd).Quote()); + if (cmd == BeginMapSymbol) { + bool unusedIsOptional; + auto unpackedType = UnpackOptional(keyType, unusedIsOptional); + YQL_ENSURE(unpackedType->IsData() && + (static_cast<TDataType*>(unpackedType)->GetSchemeType() == NUdf::TDataType<char*>::Id || + static_cast<TDataType*>(unpackedType)->GetSchemeType() == NUdf::TDataType<NUdf::TUtf8>::Id), + "Expected String or Utf8 type as dictionary key type"); + + auto filler = [&](TValuesDictHashMap& map) { + cmd = buf.Read(); + + for (;;) { + if (cmd == EndMapSymbol) { + break; + } + + auto keyBuffer = ReadNextString(cmd, buf); + auto keyStr = NUdf::TUnboxedValue(MakeString(keyBuffer)); + EXPECTED(buf, KeyValueSeparatorSymbol); + cmd = buf.Read(); + auto payload = ReadYsonValueInTableFormat(payloadType, nativeYtTypeFlags, holderFactory, cmd, buf); + map.emplace(std::move(keyStr), std::move(payload)); + + cmd = buf.Read(); + if (cmd == KeyedItemSeparatorSymbol) { + cmd = buf.Read(); + } + } + }; + + const NUdf::IHash* hash = holderFactory.GetHash(*keyType, useIHash); + const NUdf::IEquate* equate = holderFactory.GetEquate(*keyType, useIHash); + return holderFactory.CreateDirectHashedDictHolder(filler, types, isTuple, true, nullptr, hash, equate); + } + else { + auto filler = [&](TValuesDictHashMap& map) { + cmd = buf.Read(); + + for (;;) { + if (cmd == EndListSymbol) { + break; + } + + CHECK_EXPECTED(cmd, BeginListSymbol); + cmd = buf.Read(); + auto key = ReadYsonValueInTableFormat(keyType, nativeYtTypeFlags, holderFactory, cmd, buf); + EXPECTED(buf, ListItemSeparatorSymbol); + cmd = buf.Read(); + auto payload = ReadYsonValueInTableFormat(payloadType, nativeYtTypeFlags, holderFactory, cmd, buf); + cmd = buf.Read(); + if (cmd == ListItemSeparatorSymbol) { + cmd = buf.Read(); + } + + CHECK_EXPECTED(cmd, EndListSymbol); + if (packer) { + key = MakeString(packer->Pack(key)); + } + + map.emplace(std::move(key), std::move(payload)); + + cmd = buf.Read(); + if (cmd == ListItemSeparatorSymbol) { + cmd = buf.Read(); + } + } + }; + + const NUdf::IHash* hash = holderFactory.GetHash(*keyType, useIHash); + const NUdf::IEquate* equate = holderFactory.GetEquate(*keyType, useIHash); + return holderFactory.CreateDirectHashedDictHolder(filler, types, isTuple, true, encoded ? keyType : nullptr, + hash, equate); + } + } + + case TType::EKind::Tuple: { + auto tupleType = static_cast<TTupleType*>(type); + NUdf::TUnboxedValue* items; + NUdf::TUnboxedValue ret = holderFactory.CreateDirectArrayHolder(tupleType->GetElementsCount(), items); + CHECK_EXPECTED(cmd, BeginListSymbol); + cmd = buf.Read(); + + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + items[i] = ReadYsonValueInTableFormat(tupleType->GetElementType(i), nativeYtTypeFlags, holderFactory, cmd, buf); + + cmd = buf.Read(); + if (cmd == ListItemSeparatorSymbol) { + cmd = buf.Read(); + } + } + + + CHECK_EXPECTED(cmd, EndListSymbol); + return ret; + } + + case TType::EKind::Void: { + if (cmd == EntitySymbol) { + return NUdf::TUnboxedValuePod::Void(); + } + + auto nextString = ReadNextString(cmd, buf); + YQL_ENSURE(nextString == NResult::TYsonResultWriter::VoidString, "Expected Void"); + return NUdf::TUnboxedValuePod::Void(); + } + + case TType::EKind::Null: { + CHECK_EXPECTED(cmd, EntitySymbol); + return NUdf::TUnboxedValuePod(); + } + + case TType::EKind::EmptyList: { + CHECK_EXPECTED(cmd, BeginListSymbol); + cmd = buf.Read(); + CHECK_EXPECTED(cmd, EndListSymbol); + return holderFactory.GetEmptyContainerLazy(); + } + + case TType::EKind::EmptyDict: { + YQL_ENSURE(cmd == BeginListSymbol || cmd == BeginMapSymbol, "Expected '{' or '[', but read: " << TString(cmd).Quote()); + if (cmd == BeginListSymbol) { + cmd = buf.Read(); + CHECK_EXPECTED(cmd, EndListSymbol); + } else { + cmd = buf.Read(); + CHECK_EXPECTED(cmd, EndMapSymbol); + } + + return holderFactory.GetEmptyContainerLazy(); + } + + case TType::EKind::Pg: { + auto pgType = static_cast<TPgType*>(type); + return ReadYsonValueInTableFormatPg(pgType, cmd, buf); + } + + case TType::EKind::Tagged: { + auto taggedType = static_cast<TTaggedType*>(type); + return ReadYsonValueInTableFormat(taggedType->GetBaseType(), nativeYtTypeFlags, holderFactory, cmd, buf); + } + + default: + YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr()); + } +} + +TMaybe<NUdf::TUnboxedValue> ParseYsonValueInTableFormat(const THolderFactory& holderFactory, + const TStringBuf& yson, TType* type, ui64 nativeYtTypeFlags, IOutputStream* err) { + try { + class TReader : public NCommon::IBlockReader { + public: + TReader(const TStringBuf& yson) + : Yson_(yson) + {} + + void SetDeadline(TInstant deadline) override { + Y_UNUSED(deadline); + } + + std::pair<const char*, const char*> NextFilledBlock() override { + if (FirstBuffer_) { + FirstBuffer_ = false; + return{ Yson_.begin(), Yson_.end() }; + } + else { + return{ nullptr, nullptr }; + } + } + + void ReturnBlock() override { + } + + bool Retry(const TMaybe<ui32>& rangeIndex, const TMaybe<ui64>& rowIndex, const std::exception_ptr& error) override { + Y_UNUSED(rangeIndex); + Y_UNUSED(rowIndex); + Y_UNUSED(error); + return false; + } + + private: + TStringBuf Yson_; + bool FirstBuffer_ = true; + }; + + TReader reader(yson); + NCommon::TInputBuf buf(reader, nullptr); + char cmd = buf.Read(); + return ReadYsonValueInTableFormat(type, nativeYtTypeFlags, holderFactory, cmd, buf); + } + catch (const yexception& e) { + if (err) { + *err << "YSON parsing failed: " << e.what(); + } + return Nothing(); + } +} + +TMaybe<NUdf::TUnboxedValue> ParseYsonNode(const THolderFactory& holderFactory, + const NYT::TNode& node, TType* type, ui64 nativeYtTypeFlags, IOutputStream* err) { + return ParseYsonValueInTableFormat(holderFactory, NYT::NodeToYsonString(node, NYson::EYsonFormat::Binary), type, nativeYtTypeFlags, err); +} + +extern "C" void ReadYsonContainerValue(TType* type, ui64 nativeYtTypeFlags, const NKikimr::NMiniKQL::THolderFactory& holderFactory, + NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf, bool wrapOptional) { + // yson content + ui32 size; + buf.ReadMany((char*)&size, sizeof(size)); + CHECK_STRING_LENGTH_UNSIGNED(size); + // parse binary yson... + YQL_ENSURE(size > 0); + char cmd = buf.Read(); + auto tmp = ReadYsonValueInTableFormat(type, nativeYtTypeFlags, holderFactory, cmd, buf); + if (!wrapOptional) { + value = std::move(tmp); + } + else { + value = tmp.Release().MakeOptional(); + } +} + +extern "C" void ReadContainerNativeYtValue(TType* type, ui64 nativeYtTypeFlags, const NKikimr::NMiniKQL::THolderFactory& holderFactory, + NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf, bool wrapOptional) { + auto tmp = ReadSkiffNativeYtValue(type, nativeYtTypeFlags, holderFactory, buf); + if (!wrapOptional) { + value = std::move(tmp); + } else { + value = tmp.Release().MakeOptional(); + } +} + +void WriteYsonValueInTableFormat(NCommon::TOutputBuf& buf, TType* type, ui64 nativeYtTypeFlags, const NUdf::TUnboxedValuePod& value, bool topLevel) { + // Table format, very compact + switch (type->GetKind()) { + case TType::EKind::Variant: { + buf.Write(BeginListSymbol); + auto varType = static_cast<TVariantType*>(type); + auto underlyingType = varType->GetUnderlyingType(); + auto index = value.GetVariantIndex(); + YQL_ENSURE(index < varType->GetAlternativesCount(), "Bad variant alternative: " << index << ", only " << varType->GetAlternativesCount() << " are available"); + YQL_ENSURE(underlyingType->IsTuple() || underlyingType->IsStruct(), "Wrong underlying type"); + TType* itemType; + if (underlyingType->IsTuple()) { + itemType = static_cast<TTupleType*>(underlyingType)->GetElementType(index); + } + else { + itemType = static_cast<TStructType*>(underlyingType)->GetMemberType(index); + } + if (!(nativeYtTypeFlags & NTCF_COMPLEX) || underlyingType->IsTuple()) { + buf.Write(Uint64Marker); + buf.WriteVarUI64(index); + } else { + auto structType = static_cast<TStructType*>(underlyingType); + auto varName = structType->GetMemberName(index); + buf.Write(StringMarker); + buf.WriteVarI32(varName.size()); + buf.WriteMany(varName); + } + buf.Write(ListItemSeparatorSymbol); + WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, value.GetVariantItem(), false); + buf.Write(ListItemSeparatorSymbol); + buf.Write(EndListSymbol); + break; + } + + case TType::EKind::Data: { + auto schemeType = static_cast<TDataType*>(type)->GetSchemeType(); + switch (schemeType) { + case NUdf::TDataType<bool>::Id: { + buf.Write(value.Get<bool>() ? TrueMarker : FalseMarker); + break; + } + + case NUdf::TDataType<ui8>::Id: + buf.Write(Uint64Marker); + buf.WriteVarUI64(value.Get<ui8>()); + break; + + case NUdf::TDataType<i8>::Id: + buf.Write(Int64Marker); + buf.WriteVarI64(value.Get<i8>()); + break; + + case NUdf::TDataType<ui16>::Id: + buf.Write(Uint64Marker); + buf.WriteVarUI64(value.Get<ui16>()); + break; + + case NUdf::TDataType<i16>::Id: + buf.Write(Int64Marker); + buf.WriteVarI64(value.Get<i16>()); + break; + + case NUdf::TDataType<i32>::Id: + buf.Write(Int64Marker); + buf.WriteVarI64(value.Get<i32>()); + break; + + case NUdf::TDataType<ui32>::Id: + buf.Write(Uint64Marker); + buf.WriteVarUI64(value.Get<ui32>()); + break; + + case NUdf::TDataType<i64>::Id: + buf.Write(Int64Marker); + buf.WriteVarI64(value.Get<i64>()); + break; + + case NUdf::TDataType<ui64>::Id: + buf.Write(Uint64Marker); + buf.WriteVarUI64(value.Get<ui64>()); + break; + + case NUdf::TDataType<float>::Id: { + buf.Write(DoubleMarker); + double val = value.Get<float>(); + buf.WriteMany((const char*)&val, sizeof(val)); + break; + } + + case NUdf::TDataType<double>::Id: { + buf.Write(DoubleMarker); + double val = value.Get<double>(); + buf.WriteMany((const char*)&val, sizeof(val)); + break; + } + + case NUdf::TDataType<NUdf::TUtf8>::Id: + case NUdf::TDataType<char*>::Id: + case NUdf::TDataType<NUdf::TJson>::Id: + case NUdf::TDataType<NUdf::TDyNumber>::Id: + case NUdf::TDataType<NUdf::TUuid>::Id: { + buf.Write(StringMarker); + auto str = value.AsStringRef(); + buf.WriteVarI32(str.Size()); + buf.WriteMany(str); + break; + } + + case NUdf::TDataType<NUdf::TDecimal>::Id: { + buf.Write(StringMarker); + if (nativeYtTypeFlags & NTCF_DECIMAL){ + auto const params = static_cast<TDataDecimalType*>(type)->GetParams(); + const NDecimal::TInt128 data128 = value.GetInt128(); + char tmpBuf[NYT::NDecimal::TDecimal::MaxBinarySize]; + if (params.first < 10) { + // The YQL format differs from the YT format in the inf/nan values. NDecimal::FromYtDecimal converts nan/inf + TStringBuf resBuf = NYT::NDecimal::TDecimal::WriteBinary32(params.first, NDecimal::ToYtDecimal<i32>(data128), tmpBuf, NYT::NDecimal::TDecimal::MaxBinarySize); + buf.WriteVarI32(resBuf.size()); + buf.WriteMany(resBuf.data(), resBuf.size()); + } else if (params.first < 19) { + TStringBuf resBuf = NYT::NDecimal::TDecimal::WriteBinary64(params.first, NDecimal::ToYtDecimal<i64>(data128), tmpBuf, NYT::NDecimal::TDecimal::MaxBinarySize); + buf.WriteVarI32(resBuf.size()); + buf.WriteMany(resBuf.data(), resBuf.size()); + } else { + YQL_ENSURE(params.first < 36); + NYT::NDecimal::TDecimal::TValue128 val; + auto data128Converted = NDecimal::ToYtDecimal<NDecimal::TInt128>(data128); + memcpy(&val, &data128Converted, sizeof(val)); + auto resBuf = NYT::NDecimal::TDecimal::WriteBinary128(params.first, val, tmpBuf, NYT::NDecimal::TDecimal::MaxBinarySize); + buf.WriteVarI32(resBuf.size()); + buf.WriteMany(resBuf.data(), resBuf.size()); + } + } else { + char data[sizeof(NDecimal::TInt128)]; + const ui32 size = NDecimal::Serialize(value.GetInt128(), data); + buf.WriteVarI32(size); + buf.WriteMany(data, size); + } + break; + } + + case NUdf::TDataType<NUdf::TYson>::Id: { + // embed content + buf.WriteMany(value.AsStringRef()); + break; + } + + case NUdf::TDataType<NUdf::TDate>::Id: + buf.Write(Uint64Marker); + buf.WriteVarUI64(value.Get<ui16>()); + break; + + case NUdf::TDataType<NUdf::TDatetime>::Id: + buf.Write(Uint64Marker); + buf.WriteVarUI64(value.Get<ui32>()); + break; + + case NUdf::TDataType<NUdf::TTimestamp>::Id: + buf.Write(Uint64Marker); + buf.WriteVarUI64(value.Get<ui64>()); + break; + + case NUdf::TDataType<NUdf::TInterval>::Id: + case NUdf::TDataType<NUdf::TInterval64>::Id: + case NUdf::TDataType<NUdf::TDatetime64>::Id: + case NUdf::TDataType<NUdf::TTimestamp64>::Id: + buf.Write(Int64Marker); + buf.WriteVarI64(value.Get<i64>()); + break; + + case NUdf::TDataType<NUdf::TDate32>::Id: + buf.Write(Int64Marker); + buf.WriteVarI64(value.Get<i32>()); + break; + + case NUdf::TDataType<NUdf::TTzDate>::Id: { + ui16 tzId = SwapBytes(value.GetTimezoneId()); + ui16 data = SwapBytes(value.Get<ui16>()); + ui32 size = sizeof(data) + sizeof(tzId); + buf.Write(StringMarker); + buf.WriteVarI32(size); + buf.WriteMany((const char*)&data, sizeof(data)); + buf.WriteMany((const char*)&tzId, sizeof(tzId)); + break; + } + + case NUdf::TDataType<NUdf::TTzDatetime>::Id: { + ui16 tzId = SwapBytes(value.GetTimezoneId()); + ui32 data = SwapBytes(value.Get<ui32>()); + ui32 size = sizeof(data) + sizeof(tzId); + buf.Write(StringMarker); + buf.WriteVarI32(size); + buf.WriteMany((const char*)&data, sizeof(data)); + buf.WriteMany((const char*)&tzId, sizeof(tzId)); + break; + } + + case NUdf::TDataType<NUdf::TTzTimestamp>::Id: { + ui16 tzId = SwapBytes(value.GetTimezoneId()); + ui64 data = SwapBytes(value.Get<ui64>()); + ui32 size = sizeof(data) + sizeof(tzId); + buf.Write(StringMarker); + buf.WriteVarI32(size); + buf.WriteMany((const char*)&data, sizeof(data)); + buf.WriteMany((const char*)&tzId, sizeof(tzId)); + break; + } + + case NUdf::TDataType<NUdf::TTzDate32>::Id: { + ui16 tzId = SwapBytes(value.GetTimezoneId()); + ui32 data = 0x80 ^ SwapBytes((ui32)value.Get<i32>()); + ui32 size = sizeof(data) + sizeof(tzId); + buf.Write(StringMarker); + buf.WriteVarI32(size); + buf.WriteMany((const char*)&data, sizeof(data)); + buf.WriteMany((const char*)&tzId, sizeof(tzId)); + break; + } + + case NUdf::TDataType<NUdf::TTzDatetime64>::Id: { + ui16 tzId = SwapBytes(value.GetTimezoneId()); + ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>()); + ui32 size = sizeof(data) + sizeof(tzId); + buf.Write(StringMarker); + buf.WriteVarI32(size); + buf.WriteMany((const char*)&data, sizeof(data)); + buf.WriteMany((const char*)&tzId, sizeof(tzId)); + break; + } + + case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: { + ui16 tzId = SwapBytes(value.GetTimezoneId()); + ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>()); + ui32 size = sizeof(data) + sizeof(tzId); + buf.Write(StringMarker); + buf.WriteVarI32(size); + buf.WriteMany((const char*)&data, sizeof(data)); + buf.WriteMany((const char*)&tzId, sizeof(tzId)); + break; + } + + case NUdf::TDataType<NUdf::TJsonDocument>::Id: { + buf.Write(StringMarker); + NUdf::TUnboxedValue json = ValueToString(EDataSlot::JsonDocument, value); + auto str = json.AsStringRef(); + buf.WriteVarI32(str.Size()); + buf.WriteMany(str); + break; + } + + default: + YQL_ENSURE(false, "Unsupported data type: " << schemeType); + } + + break; + } + + case TType::EKind::Struct: { + auto structType = static_cast<TStructType*>(type); + if (nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) { + buf.Write(BeginMapSymbol); + for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { + buf.Write(StringMarker); + auto key = structType->GetMemberName(i); + buf.WriteVarI32(key.size()); + buf.WriteMany(key); + buf.Write(KeyValueSeparatorSymbol); + WriteYsonValueInTableFormat(buf, structType->GetMemberType(i), nativeYtTypeFlags, value.GetElement(i), false); + buf.Write(KeyedItemSeparatorSymbol); + } + buf.Write(EndMapSymbol); + } else { + buf.Write(BeginListSymbol); + for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { + WriteYsonValueInTableFormat(buf, structType->GetMemberType(i), nativeYtTypeFlags, value.GetElement(i), false); + buf.Write(ListItemSeparatorSymbol); + } + buf.Write(EndListSymbol); + } + break; + } + + case TType::EKind::List: { + auto itemType = static_cast<TListType*>(type)->GetItemType(); + const auto iter = value.GetListIterator(); + buf.Write(BeginListSymbol); + for (NUdf::TUnboxedValue item; iter.Next(item); buf.Write(ListItemSeparatorSymbol)) { + WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, item, false); + } + + buf.Write(EndListSymbol); + break; + } + + case TType::EKind::Optional: { + auto itemType = static_cast<TOptionalType*>(type)->GetItemType(); + if (nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) { + if (value) { + if (itemType->GetKind() == TType::EKind::Optional || itemType->GetKind() == TType::EKind::Pg) { + buf.Write(BeginListSymbol); + } + WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, value.GetOptionalValue(), false); + if (itemType->GetKind() == TType::EKind::Optional || itemType->GetKind() == TType::EKind::Pg) { + buf.Write(ListItemSeparatorSymbol); + buf.Write(EndListSymbol); + } + } else { + buf.Write(EntitySymbol); + } + } else { + if (!value) { + if (topLevel) { + buf.Write(BeginListSymbol); + buf.Write(EndListSymbol); + } + else { + buf.Write(EntitySymbol); + } + } + else { + buf.Write(BeginListSymbol); + WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, value.GetOptionalValue(), false); + buf.Write(ListItemSeparatorSymbol); + buf.Write(EndListSymbol); + } + } + break; + } + + case TType::EKind::Dict: { + auto dictType = static_cast<TDictType*>(type); + const auto iter = value.GetDictIterator(); + buf.Write(BeginListSymbol); + for (NUdf::TUnboxedValue key, payload; iter.NextPair(key, payload);) { + buf.Write(BeginListSymbol); + WriteYsonValueInTableFormat(buf, dictType->GetKeyType(), nativeYtTypeFlags, key, false); + buf.Write(ListItemSeparatorSymbol); + WriteYsonValueInTableFormat(buf, dictType->GetPayloadType(), nativeYtTypeFlags, payload, false); + buf.Write(ListItemSeparatorSymbol); + buf.Write(EndListSymbol); + buf.Write(ListItemSeparatorSymbol); + } + + buf.Write(EndListSymbol); + break; + } + + case TType::EKind::Tuple: { + auto tupleType = static_cast<TTupleType*>(type); + buf.Write(BeginListSymbol); + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + WriteYsonValueInTableFormat(buf, tupleType->GetElementType(i), nativeYtTypeFlags, value.GetElement(i), false); + buf.Write(ListItemSeparatorSymbol); + } + + buf.Write(EndListSymbol); + break; + } + + case TType::EKind::Void: { + buf.Write(EntitySymbol); + break; + } + + case TType::EKind::Null: { + buf.Write(EntitySymbol); + break; + } + + case TType::EKind::EmptyList: { + buf.Write(BeginListSymbol); + buf.Write(EndListSymbol); + break; + } + + case TType::EKind::EmptyDict: { + buf.Write(BeginListSymbol); + buf.Write(EndListSymbol); + break; + } + + case TType::EKind::Pg: { + auto pgType = static_cast<TPgType*>(type); + WriteYsonValueInTableFormatPg(buf, pgType, value, topLevel); + break; + } + + default: + YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr()); + } +} + +/////////////////////////////////////////// +// +// Initial state first = last = &dummy +// +// +1 block first = &dummy, last = newPage, first.next = newPage, newPage.next= &dummy +// +1 block first = &dummy, last = newPage2, first.next = newPage, newPage.next = newPage2, newPage2.next = &dummy +// +/////////////////////////////////////////// +class TTempBlockWriter : public NCommon::IBlockWriter { +public: + TTempBlockWriter() + : Pool_(*TlsAllocState) + , Last_(&Dummy_) + { + Dummy_.Avail_ = 0; + Dummy_.Next_ = &Dummy_; + } + + ~TTempBlockWriter() { + auto current = Dummy_.Next_; // skip dummy node + while (current != &Dummy_) { + auto next = current->Next_; + Pool_.ReturnPage(current); + current = next; + } + } + + void SetRecordBoundaryCallback(std::function<void()> callback) override { + Y_UNUSED(callback); + } + + void WriteBlocks(NCommon::TOutputBuf& buf) const { + auto current = Dummy_.Next_; // skip dummy node + while (current != &Dummy_) { + auto next = current->Next_; + buf.WriteMany((const char*)(current + 1), current->Avail_); + current = next; + } + } + + TTempBlockWriter(const TTempBlockWriter&) = delete; + void operator=(const TTempBlockWriter&) = delete; + + std::pair<char*, char*> NextEmptyBlock() override { + auto newPage = Pool_.GetPage(); + auto header = (TPageHeader*)newPage; + header->Avail_ = 0; + header->Next_ = &Dummy_; + Last_->Next_ = header; + Last_ = header; + return std::make_pair((char*)(header + 1), (char*)newPage + TAlignedPagePool::POOL_PAGE_SIZE); + } + + void ReturnBlock(size_t avail, std::optional<size_t> lastRecordBoundary) override { + Y_UNUSED(lastRecordBoundary); + YQL_ENSURE(avail <= TAlignedPagePool::POOL_PAGE_SIZE - sizeof(TPageHeader)); + Last_->Avail_ = avail; + } + + void Finish() override { + } + +private: + struct TPageHeader { + TPageHeader* Next_ = nullptr; + ui32 Avail_ = 0; + }; + + NKikimr::TAlignedPagePool& Pool_; + TPageHeader* Last_; + TPageHeader Dummy_; +}; + +extern "C" void WriteYsonContainerValue(TType* type, ui64 nativeYtTypeFlags, const NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) { + TTempBlockWriter blockWriter; + NCommon::TOutputBuf ysonBuf(blockWriter, nullptr); + WriteYsonValueInTableFormat(ysonBuf, type, nativeYtTypeFlags, value, true); + ysonBuf.Flush(); + ui32 size = ysonBuf.GetWrittenBytes(); + buf.WriteMany((const char*)&size, sizeof(size)); + blockWriter.WriteBlocks(buf); +} + +void SkipSkiffField(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, NCommon::TInputBuf& buf) { + const bool isOptional = type->IsOptional(); + if (isOptional) { + // Unwrap optional + type = static_cast<TOptionalType*>(type)->GetItemType(); + } + + if (isOptional) { + auto marker = buf.Read(); + if (!marker) { + return; + } + } + + if (type->IsData()) { + auto schemeType = static_cast<TDataType*>(type)->GetSchemeType(); + switch (schemeType) { + case NUdf::TDataType<bool>::Id: + buf.SkipMany(sizeof(ui8)); + break; + + case NUdf::TDataType<ui8>::Id: + case NUdf::TDataType<ui16>::Id: + case NUdf::TDataType<ui32>::Id: + case NUdf::TDataType<ui64>::Id: + case NUdf::TDataType<NUdf::TDate>::Id: + case NUdf::TDataType<NUdf::TDatetime>::Id: + case NUdf::TDataType<NUdf::TTimestamp>::Id: + buf.SkipMany(sizeof(ui64)); + break; + + case NUdf::TDataType<i8>::Id: + case NUdf::TDataType<i16>::Id: + case NUdf::TDataType<i32>::Id: + case NUdf::TDataType<i64>::Id: + case NUdf::TDataType<NUdf::TInterval>::Id: + case NUdf::TDataType<NUdf::TDate32>::Id: + case NUdf::TDataType<NUdf::TDatetime64>::Id: + case NUdf::TDataType<NUdf::TTimestamp64>::Id: + case NUdf::TDataType<NUdf::TInterval64>::Id: + buf.SkipMany(sizeof(i64)); + break; + + case NUdf::TDataType<float>::Id: + case NUdf::TDataType<double>::Id: + buf.SkipMany(sizeof(double)); + break; + + case NUdf::TDataType<NUdf::TUtf8>::Id: + case NUdf::TDataType<char*>::Id: + case NUdf::TDataType<NUdf::TJson>::Id: + case NUdf::TDataType<NUdf::TYson>::Id: + case NUdf::TDataType<NUdf::TUuid>::Id: + case NUdf::TDataType<NUdf::TDyNumber>::Id: + case NUdf::TDataType<NUdf::TTzDate>::Id: + case NUdf::TDataType<NUdf::TTzDatetime>::Id: + case NUdf::TDataType<NUdf::TTzTimestamp>::Id: + case NUdf::TDataType<NUdf::TJsonDocument>::Id: { + ui32 size; + buf.ReadMany((char*)&size, sizeof(size)); + CHECK_STRING_LENGTH_UNSIGNED(size); + buf.SkipMany(size); + break; + } + case NUdf::TDataType<NUdf::TDecimal>::Id: { + if (nativeYtTypeFlags & NTCF_DECIMAL) { + auto const params = static_cast<TDataDecimalType*>(type)->GetParams(); + if (params.first < 10) { + buf.SkipMany(sizeof(i32)); + } else if (params.first < 19) { + buf.SkipMany(sizeof(i64)); + } else { + buf.SkipMany(sizeof(NDecimal::TInt128)); + } + } else { + ui32 size; + buf.ReadMany((char*)&size, sizeof(size)); + CHECK_STRING_LENGTH_UNSIGNED(size); + buf.SkipMany(size); + } + break; + } + default: + YQL_ENSURE(false, "Unsupported data type: " << schemeType); + } + return; + } + + if (type->IsPg()) { + SkipSkiffPg(static_cast<TPgType*>(type), buf); + return; + } + + if (type->IsStruct()) { + auto structType = static_cast<TStructType*>(type); + const std::vector<size_t>* reorder = nullptr; + if (auto cookie = structType->GetCookie()) { + reorder = ((const std::vector<size_t>*)cookie); + } + for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { + SkipSkiffField(structType->GetMemberType(reorder ? reorder->at(i) : i), nativeYtTypeFlags, buf); + } + return; + } + + if (type->IsList()) { + auto itemType = static_cast<TListType*>(type)->GetItemType(); + while (buf.Read() == '\0') { + SkipSkiffField(itemType, nativeYtTypeFlags, buf); + } + return; + } + + if (type->IsTuple()) { + auto tupleType = static_cast<TTupleType*>(type); + + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + SkipSkiffField(tupleType->GetElementType(i), nativeYtTypeFlags, buf); + } + return; + } + + if (type->IsVariant()) { + auto varType = AS_TYPE(TVariantType, type); + ui16 data = 0; + if (varType->GetAlternativesCount() < 256) { + buf.ReadMany((char*)&data, 1); + } else { + buf.ReadMany((char*)&data, sizeof(data)); + } + + if (varType->GetUnderlyingType()->IsTuple()) { + auto tupleType = AS_TYPE(TTupleType, varType->GetUnderlyingType()); + YQL_ENSURE(data < tupleType->GetElementsCount()); + SkipSkiffField(tupleType->GetElementType(data), nativeYtTypeFlags, buf); + } else { + auto structType = AS_TYPE(TStructType, varType->GetUnderlyingType()); + if (auto cookie = structType->GetCookie()) { + const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie); + data = reorder[data]; + } + YQL_ENSURE(data < structType->GetMembersCount()); + + SkipSkiffField(structType->GetMemberType(data), nativeYtTypeFlags, buf); + } + return; + } + + if (type->IsVoid()) { + return; + } + + if (type->IsNull()) { + return; + } + + if (type->IsEmptyList() || type->IsEmptyDict()) { + return; + } + + if (type->IsDict()) { + auto dictType = AS_TYPE(TDictType, type); + auto keyType = dictType->GetKeyType(); + auto payloadType = dictType->GetPayloadType(); + while (buf.Read() == '\0') { + SkipSkiffField(keyType, nativeYtTypeFlags, buf); + SkipSkiffField(payloadType, nativeYtTypeFlags, buf); + } + return; + } + + YQL_ENSURE(false, "Unsupported type for skip: " << type->GetKindAsStr()); +} + +NKikimr::NUdf::TUnboxedValue ReadSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, NCommon::TInputBuf& buf) +{ + if (type->IsData()) { + return ReadSkiffData(type, nativeYtTypeFlags, buf); + } + + if (type->IsPg()) { + return ReadSkiffPg(static_cast<TPgType*>(type), buf); + } + + if (type->IsOptional()) { + auto marker = buf.Read(); + if (!marker) { + return NUdf::TUnboxedValue(); + } + + auto value = ReadSkiffNativeYtValue(AS_TYPE(TOptionalType, type)->GetItemType(), nativeYtTypeFlags, holderFactory, buf); + return value.Release().MakeOptional(); + } + + if (type->IsTuple()) { + auto tupleType = AS_TYPE(TTupleType, type); + NUdf::TUnboxedValue* items; + auto value = holderFactory.CreateDirectArrayHolder(tupleType->GetElementsCount(), items); + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + items[i] = ReadSkiffNativeYtValue(tupleType->GetElementType(i), nativeYtTypeFlags, holderFactory, buf); + } + + return value; + } + + if (type->IsStruct()) { + auto structType = AS_TYPE(TStructType, type); + NUdf::TUnboxedValue* items; + auto value = holderFactory.CreateDirectArrayHolder(structType->GetMembersCount(), items); + + if (auto cookie = type->GetCookie()) { + const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie); + for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { + const auto ndx = reorder[i]; + items[ndx] = ReadSkiffNativeYtValue(structType->GetMemberType(ndx), nativeYtTypeFlags, holderFactory, buf); + } + } else { + for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { + items[i] = ReadSkiffNativeYtValue(structType->GetMemberType(i), nativeYtTypeFlags, holderFactory, buf); + } + } + + return value; + } + + if (type->IsList()) { + auto itemType = AS_TYPE(TListType, type)->GetItemType(); + TDefaultListRepresentation items; + while (buf.Read() == '\0') { + items = items.Append(ReadSkiffNativeYtValue(itemType, nativeYtTypeFlags, holderFactory, buf)); + } + + return holderFactory.CreateDirectListHolder(std::move(items)); + } + + if (type->IsVariant()) { + auto varType = AS_TYPE(TVariantType, type); + ui16 data = 0; + if (varType->GetAlternativesCount() < 256) { + buf.ReadMany((char*)&data, 1); + } else { + buf.ReadMany((char*)&data, sizeof(data)); + } + if (varType->GetUnderlyingType()->IsTuple()) { + auto tupleType = AS_TYPE(TTupleType, varType->GetUnderlyingType()); + YQL_ENSURE(data < tupleType->GetElementsCount()); + auto item = ReadSkiffNativeYtValue(tupleType->GetElementType(data), nativeYtTypeFlags, holderFactory, buf); + return holderFactory.CreateVariantHolder(item.Release(), data); + } + else { + auto structType = AS_TYPE(TStructType, varType->GetUnderlyingType()); + if (auto cookie = structType->GetCookie()) { + const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie); + data = reorder[data]; + } + YQL_ENSURE(data < structType->GetMembersCount()); + + auto item = ReadSkiffNativeYtValue(structType->GetMemberType(data), nativeYtTypeFlags, holderFactory, buf); + return holderFactory.CreateVariantHolder(item.Release(), data); + } + } + + if (type->IsVoid()) { + return NUdf::TUnboxedValue::Zero(); + } + + if (type->IsNull()) { + return NUdf::TUnboxedValue(); + } + + if (type->IsEmptyList() || type->IsEmptyDict()) { + return holderFactory.GetEmptyContainerLazy(); + } + + if (type->IsDict()) { + auto dictType = AS_TYPE(TDictType, type); + auto keyType = dictType->GetKeyType(); + auto payloadType = dictType->GetPayloadType(); + + auto builder = holderFactory.NewDict(dictType, NUdf::TDictFlags::EDictKind::Hashed); + while (buf.Read() == '\0') { + auto key = ReadSkiffNativeYtValue(keyType, nativeYtTypeFlags, holderFactory, buf); + auto payload = ReadSkiffNativeYtValue(payloadType, nativeYtTypeFlags, holderFactory, buf); + builder->Add(std::move(key), std::move(payload)); + } + + return builder->Build(); + } + + YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr()); +} + +NUdf::TUnboxedValue ReadSkiffData(TType* type, ui64 nativeYtTypeFlags, NCommon::TInputBuf& buf) { + auto schemeType = static_cast<TDataType*>(type)->GetSchemeType(); + switch (schemeType) { + case NUdf::TDataType<bool>::Id: { + ui8 data; + buf.ReadMany((char*)&data, sizeof(data)); + return NUdf::TUnboxedValuePod(data != 0); + } + + case NUdf::TDataType<ui8>::Id: { + ui64 data; + buf.ReadMany((char*)&data, sizeof(data)); + return NUdf::TUnboxedValuePod(ui8(data)); + } + + case NUdf::TDataType<i8>::Id: { + i64 data; + buf.ReadMany((char*)&data, sizeof(data)); + return NUdf::TUnboxedValuePod(i8(data)); + } + + case NUdf::TDataType<NUdf::TDate>::Id: + case NUdf::TDataType<ui16>::Id: { + ui64 data; + buf.ReadMany((char*)&data, sizeof(data)); + return NUdf::TUnboxedValuePod(ui16(data)); + } + + case NUdf::TDataType<i16>::Id: { + i64 data; + buf.ReadMany((char*)&data, sizeof(data)); + return NUdf::TUnboxedValuePod(i16(data)); + } + + case NUdf::TDataType<NUdf::TDate32>::Id: + case NUdf::TDataType<i32>::Id: { + i64 data; + buf.ReadMany((char*)&data, sizeof(data)); + return NUdf::TUnboxedValuePod(i32(data)); + } + + case NUdf::TDataType<NUdf::TDatetime>::Id: + case NUdf::TDataType<ui32>::Id: { + ui64 data; + buf.ReadMany((char*)&data, sizeof(data)); + return NUdf::TUnboxedValuePod(ui32(data)); + } + + case NUdf::TDataType<NUdf::TInterval>::Id: + case NUdf::TDataType<NUdf::TInterval64>::Id: + case NUdf::TDataType<NUdf::TDatetime64>::Id: + case NUdf::TDataType<NUdf::TTimestamp64>::Id: + case NUdf::TDataType<i64>::Id: { + i64 data; + buf.ReadMany((char*)&data, sizeof(data)); + return NUdf::TUnboxedValuePod(data); + } + + case NUdf::TDataType<NUdf::TTimestamp>::Id: + case NUdf::TDataType<ui64>::Id: { + ui64 data; + buf.ReadMany((char*)&data, sizeof(data)); + return NUdf::TUnboxedValuePod(data); + } + + case NUdf::TDataType<float>::Id: { + double data; + buf.ReadMany((char*)&data, sizeof(data)); + return NUdf::TUnboxedValuePod(float(data)); + } + + case NUdf::TDataType<double>::Id: { + double data; + buf.ReadMany((char*)&data, sizeof(data)); + return NUdf::TUnboxedValuePod(data); + } + + case NUdf::TDataType<NUdf::TUtf8>::Id: + case NUdf::TDataType<char*>::Id: + case NUdf::TDataType<NUdf::TJson>::Id: + case NUdf::TDataType<NUdf::TYson>::Id: + case NUdf::TDataType<NUdf::TDyNumber>::Id: + case NUdf::TDataType<NUdf::TUuid>::Id: { + ui32 size; + buf.ReadMany((char*)&size, sizeof(size)); + CHECK_STRING_LENGTH_UNSIGNED(size); + auto str = NUdf::TUnboxedValue(MakeStringNotFilled(size)); + buf.ReadMany(str.AsStringRef().Data(), size); + return str; + } + + case NUdf::TDataType<NUdf::TDecimal>::Id: { + if (nativeYtTypeFlags & NTCF_DECIMAL) { + auto const params = static_cast<TDataDecimalType*>(type)->GetParams(); + if (params.first < 10) { + i32 data; + buf.ReadMany((char*)&data, sizeof(data)); + return NUdf::TUnboxedValuePod(NDecimal::FromYtDecimal(data)); + } else if (params.first < 19) { + i64 data; + buf.ReadMany((char*)&data, sizeof(data)); + return NUdf::TUnboxedValuePod(NDecimal::FromYtDecimal(data)); + } else { + YQL_ENSURE(params.first < 36); + NDecimal::TInt128 data; + buf.ReadMany((char*)&data, sizeof(data)); + return NUdf::TUnboxedValuePod(NDecimal::FromYtDecimal(data)); + } + } else { + ui32 size; + buf.ReadMany(reinterpret_cast<char*>(&size), sizeof(size)); + const auto maxSize = sizeof(NDecimal::TInt128); + YQL_ENSURE(size > 0U && size <= maxSize, "Bad decimal field size: " << size); + char data[maxSize]; + buf.ReadMany(data, size); + const auto& v = NDecimal::Deserialize(data, size); + YQL_ENSURE(!NDecimal::IsError(v.first), "Bad decimal field data: " << data); + YQL_ENSURE(size == v.second, "Bad decimal field size: " << size); + return NUdf::TUnboxedValuePod(v.first); + } + } + + case NUdf::TDataType<NUdf::TTzDate>::Id: { + ui32 size; + buf.ReadMany((char*)&size, sizeof(size)); + CHECK_STRING_LENGTH_UNSIGNED(size); + auto& vec = buf.YsonBuffer(); + vec.resize(size); + buf.ReadMany(vec.data(), size); + ui16 value; + ui16 tzId; + YQL_ENSURE(DeserializeTzDate(TStringBuf(vec.begin(), vec.end()), value, tzId)); + auto data = NUdf::TUnboxedValuePod(value); + data.SetTimezoneId(tzId); + return data; + } + + case NUdf::TDataType<NUdf::TTzDatetime>::Id: { + ui32 size; + buf.ReadMany((char*)&size, sizeof(size)); + CHECK_STRING_LENGTH_UNSIGNED(size); + auto& vec = buf.YsonBuffer(); + vec.resize(size); + buf.ReadMany(vec.data(), size); + ui32 value; + ui16 tzId; + YQL_ENSURE(DeserializeTzDatetime(TStringBuf(vec.begin(), vec.end()), value, tzId)); + auto data = NUdf::TUnboxedValuePod(value); + data.SetTimezoneId(tzId); + return data; + } + + case NUdf::TDataType<NUdf::TTzTimestamp>::Id: { + ui32 size; + buf.ReadMany((char*)&size, sizeof(size)); + CHECK_STRING_LENGTH_UNSIGNED(size); + auto& vec = buf.YsonBuffer(); + vec.resize(size); + buf.ReadMany(vec.data(), size); + ui64 value; + ui16 tzId; + YQL_ENSURE(DeserializeTzTimestamp(TStringBuf(vec.begin(), vec.end()), value, tzId)); + auto data = NUdf::TUnboxedValuePod(value); + data.SetTimezoneId(tzId); + return data; + } + + case NUdf::TDataType<NUdf::TJsonDocument>::Id: { + ui32 size; + buf.ReadMany((char*)&size, sizeof(size)); + CHECK_STRING_LENGTH_UNSIGNED(size); + auto json = NUdf::TUnboxedValue(MakeStringNotFilled(size)); + buf.ReadMany(json.AsStringRef().Data(), size); + return ValueFromString(EDataSlot::JsonDocument, json.AsStringRef()); + } + + default: + YQL_ENSURE(false, "Unsupported data type: " << schemeType); + } +} + +void WriteSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) { + auto schemeType = static_cast<TDataType*>(type)->GetSchemeType(); + switch (schemeType) { + case NUdf::TDataType<bool>::Id: { + ui8 data = value.Get<ui8>(); + buf.Write(data); + break; + } + + case NUdf::TDataType<ui8>::Id: { + ui64 data = value.Get<ui8>(); + buf.WriteMany((const char*)&data, sizeof(data)); + break; + } + + case NUdf::TDataType<i8>::Id: { + i64 data = value.Get<i8>(); + buf.WriteMany((const char*)&data, sizeof(data)); + break; + } + + case NUdf::TDataType<NUdf::TDate>::Id: + case NUdf::TDataType<ui16>::Id: { + ui64 data = value.Get<ui16>(); + buf.WriteMany((const char*)&data, sizeof(data)); + break; + } + + case NUdf::TDataType<i16>::Id: { + i64 data = value.Get<i16>(); + buf.WriteMany((const char*)&data, sizeof(data)); + break; + } + + case NUdf::TDataType<NUdf::TDate32>::Id: + case NUdf::TDataType<i32>::Id: { + i64 data = value.Get<i32>(); + buf.WriteMany((const char*)&data, sizeof(data)); + break; + } + + case NUdf::TDataType<NUdf::TDatetime>::Id: + case NUdf::TDataType<ui32>::Id: { + ui64 data = value.Get<ui32>(); + buf.WriteMany((const char*)&data, sizeof(data)); + break; + } + + case NUdf::TDataType<NUdf::TInterval>::Id: + case NUdf::TDataType<NUdf::TInterval64>::Id: + case NUdf::TDataType<NUdf::TDatetime64>::Id: + case NUdf::TDataType<NUdf::TTimestamp64>::Id: + case NUdf::TDataType<i64>::Id: { + i64 data = value.Get<i64>(); + buf.WriteMany((const char*)&data, sizeof(data)); + break; + } + + case NUdf::TDataType<NUdf::TTimestamp>::Id: + case NUdf::TDataType<ui64>::Id: { + ui64 data = value.Get<ui64>(); + buf.WriteMany((const char*)&data, sizeof(data)); + break; + } + + case NUdf::TDataType<float>::Id: { + double data = value.Get<float>(); + buf.WriteMany((const char*)&data, sizeof(data)); + break; + } + + case NUdf::TDataType<double>::Id: { + double data = value.Get<double>(); + buf.WriteMany((const char*)&data, sizeof(data)); + break; + } + + case NUdf::TDataType<NUdf::TUtf8>::Id: + case NUdf::TDataType<char*>::Id: + case NUdf::TDataType<NUdf::TJson>::Id: + case NUdf::TDataType<NUdf::TYson>::Id: + case NUdf::TDataType<NUdf::TDyNumber>::Id: + case NUdf::TDataType<NUdf::TUuid>::Id: { + auto str = value.AsStringRef(); + ui32 size = str.Size(); + buf.WriteMany((const char*)&size, sizeof(size)); + buf.WriteMany(str); + break; + } + + case NUdf::TDataType<NUdf::TDecimal>::Id: { + if (nativeYtTypeFlags & NTCF_DECIMAL) { + auto const params = static_cast<TDataDecimalType*>(type)->GetParams(); + const NDecimal::TInt128 data128 = value.GetInt128(); + if (params.first < 10) { + auto data = NDecimal::ToYtDecimal<i32>(data128); + buf.WriteMany((const char*)&data, sizeof(data)); + } else if (params.first < 19) { + auto data = NDecimal::ToYtDecimal<i64>(data128); + buf.WriteMany((const char*)&data, sizeof(data)); + } else { + YQL_ENSURE(params.first < 36); + auto data = NDecimal::ToYtDecimal<NDecimal::TInt128>(data128); + buf.WriteMany((const char*)&data, sizeof(data)); + } + } else { + char data[sizeof(NDecimal::TInt128)]; + const ui32 size = NDecimal::Serialize(value.GetInt128(), data); + buf.WriteMany(reinterpret_cast<const char*>(&size), sizeof(size)); + buf.WriteMany(data, size); + } + break; + } + + case NUdf::TDataType<NUdf::TTzDate>::Id: { + ui16 tzId = SwapBytes(value.GetTimezoneId()); + ui16 data = SwapBytes(value.Get<ui16>()); + ui32 size = sizeof(data) + sizeof(tzId); + buf.WriteMany((const char*)&size, sizeof(size)); + buf.WriteMany((const char*)&data, sizeof(data)); + buf.WriteMany((const char*)&tzId, sizeof(tzId)); + break; + } + + case NUdf::TDataType<NUdf::TTzDatetime>::Id: { + ui16 tzId = SwapBytes(value.GetTimezoneId()); + ui32 data = SwapBytes(value.Get<ui32>()); + ui32 size = sizeof(data) + sizeof(tzId); + buf.WriteMany((const char*)&size, sizeof(size)); + buf.WriteMany((const char*)&data, sizeof(data)); + buf.WriteMany((const char*)&tzId, sizeof(tzId)); + break; + } + + case NUdf::TDataType<NUdf::TTzTimestamp>::Id: { + ui16 tzId = SwapBytes(value.GetTimezoneId()); + ui64 data = SwapBytes(value.Get<ui64>()); + ui32 size = sizeof(data) + sizeof(tzId); + buf.WriteMany((const char*)&size, sizeof(size)); + buf.WriteMany((const char*)&data, sizeof(data)); + buf.WriteMany((const char*)&tzId, sizeof(tzId)); + break; + } + + case NUdf::TDataType<NUdf::TTzDate32>::Id: { + ui16 tzId = SwapBytes(value.GetTimezoneId()); + ui32 data = 0x80 ^ SwapBytes((ui32)value.Get<i32>()); + ui32 size = sizeof(data) + sizeof(tzId); + buf.WriteMany((const char*)&size, sizeof(size)); + buf.WriteMany((const char*)&data, sizeof(data)); + buf.WriteMany((const char*)&tzId, sizeof(tzId)); + break; + } + + case NUdf::TDataType<NUdf::TTzDatetime64>::Id: { + ui16 tzId = SwapBytes(value.GetTimezoneId()); + ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>()); + ui32 size = sizeof(data) + sizeof(tzId); + buf.WriteMany((const char*)&size, sizeof(size)); + buf.WriteMany((const char*)&data, sizeof(data)); + buf.WriteMany((const char*)&tzId, sizeof(tzId)); + break; + } + + case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: { + ui16 tzId = SwapBytes(value.GetTimezoneId()); + ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>()); + ui32 size = sizeof(data) + sizeof(tzId); + buf.WriteMany((const char*)&size, sizeof(size)); + buf.WriteMany((const char*)&data, sizeof(data)); + buf.WriteMany((const char*)&tzId, sizeof(tzId)); + break; + } + + case NUdf::TDataType<NUdf::TJsonDocument>::Id: { + NUdf::TUnboxedValue json = ValueToString(EDataSlot::JsonDocument, value); + auto str = json.AsStringRef(); + ui32 size = str.Size(); + buf.WriteMany((const char*)&size, sizeof(size)); + buf.WriteMany(str); + break; + } + + default: + YQL_ENSURE(false, "Unsupported data type: " << schemeType); + } +} + +void WriteSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) { + if (type->IsData()) { + WriteSkiffData(type, nativeYtTypeFlags, value, buf); + } else if (type->IsPg()) { + WriteSkiffPgValue(static_cast<TPgType*>(type), value, buf); + } else if (type->IsOptional()) { + if (!value) { + buf.Write('\0'); + return; + } + + buf.Write('\1'); + WriteSkiffNativeYtValue(AS_TYPE(TOptionalType, type)->GetItemType(), nativeYtTypeFlags, value.GetOptionalValue(), buf); + } else if (type->IsList()) { + auto itemType = AS_TYPE(TListType, type)->GetItemType(); + auto elements = value.GetElements(); + if (elements) { + ui32 size = value.GetListLength(); + for (ui32 i = 0; i < size; ++i) { + buf.Write('\0'); + WriteSkiffNativeYtValue(itemType, nativeYtTypeFlags, elements[i], buf); + } + } else { + NUdf::TUnboxedValue item; + for (auto iter = value.GetListIterator(); iter.Next(item); ) { + buf.Write('\0'); + WriteSkiffNativeYtValue(itemType, nativeYtTypeFlags, item, buf); + } + } + + buf.Write('\xff'); + } else if (type->IsTuple()) { + auto tupleType = AS_TYPE(TTupleType, type); + auto elements = value.GetElements(); + if (elements) { + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + WriteSkiffNativeYtValue(tupleType->GetElementType(i), nativeYtTypeFlags, elements[i], buf); + } + } else { + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + WriteSkiffNativeYtValue(tupleType->GetElementType(i), nativeYtTypeFlags, value.GetElement(i), buf); + } + } + } else if (type->IsStruct()) { + auto structType = AS_TYPE(TStructType, type); + auto elements = value.GetElements(); + if (auto cookie = type->GetCookie()) { + const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie); + if (elements) { + for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { + const auto ndx = reorder[i]; + WriteSkiffNativeYtValue(structType->GetMemberType(ndx), nativeYtTypeFlags, elements[ndx], buf); + } + } else { + for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { + const auto ndx = reorder[i]; + WriteSkiffNativeYtValue(structType->GetMemberType(ndx), nativeYtTypeFlags, value.GetElement(ndx), buf); + } + } + } else { + if (elements) { + for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { + WriteSkiffNativeYtValue(structType->GetMemberType(i), nativeYtTypeFlags, elements[i], buf); + } + } else { + for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { + WriteSkiffNativeYtValue(structType->GetMemberType(i), nativeYtTypeFlags, value.GetElement(i), buf); + } + } + } + } else if (type->IsVariant()) { + auto varType = AS_TYPE(TVariantType, type); + ui16 index = (ui16)value.GetVariantIndex(); + if (varType->GetAlternativesCount() < 256) { + buf.WriteMany((const char*)&index, 1); + } else { + buf.WriteMany((const char*)&index, sizeof(index)); + } + + if (varType->GetUnderlyingType()->IsTuple()) { + auto tupleType = AS_TYPE(TTupleType, varType->GetUnderlyingType()); + WriteSkiffNativeYtValue(tupleType->GetElementType(index), nativeYtTypeFlags, value.GetVariantItem(), buf); + } else { + auto structType = AS_TYPE(TStructType, varType->GetUnderlyingType()); + if (auto cookie = structType->GetCookie()) { + const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie); + index = reorder[index]; + } + YQL_ENSURE(index < structType->GetMembersCount()); + + WriteSkiffNativeYtValue(structType->GetMemberType(index), nativeYtTypeFlags, value.GetVariantItem(), buf); + } + } else if (type->IsVoid() || type->IsNull() || type->IsEmptyList() || type->IsEmptyDict()) { + } else if (type->IsDict()) { + auto dictType = AS_TYPE(TDictType, type); + auto keyType = dictType->GetKeyType(); + auto payloadType = dictType->GetPayloadType(); + NUdf::TUnboxedValue key, payload; + for (auto iter = value.GetDictIterator(); iter.NextPair(key, payload); ) { + buf.Write('\0'); + WriteSkiffNativeYtValue(keyType, nativeYtTypeFlags, key, buf); + WriteSkiffNativeYtValue(payloadType, nativeYtTypeFlags, payload, buf); + } + + buf.Write('\xff'); + } else { + YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr()); + } +} + +extern "C" void WriteContainerNativeYtValue(TType* type, ui64 nativeYtTypeFlags, const NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) { + WriteSkiffNativeYtValue(type, nativeYtTypeFlags, value, buf); +} + } // NYql diff --git a/yt/yql/providers/yt/codec/yt_codec.h b/yt/yql/providers/yt/codec/yt_codec.h index c7bae246ac..f7ea0e0b78 100644 --- a/yt/yql/providers/yt/codec/yt_codec.h +++ b/yt/yql/providers/yt/codec/yt_codec.h @@ -257,4 +257,30 @@ public: using IMkqlWriterImplPtr = TIntrusivePtr<IMkqlWriterImpl>; +NKikimr::NUdf::TUnboxedValue ReadYsonValueInTableFormat(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, char cmd, NCommon::TInputBuf& buf); +TMaybe<NKikimr::NUdf::TUnboxedValue> ParseYsonValueInTableFormat(const NKikimr::NMiniKQL::THolderFactory& holderFactory, + const TStringBuf& yson, NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, IOutputStream* err); +TMaybe<NKikimr::NUdf::TUnboxedValue> ParseYsonNode(const NKikimr::NMiniKQL::THolderFactory& holderFactory, + const NYT::TNode& node, NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, IOutputStream* err); +extern "C" void ReadYsonContainerValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, NKikimr::NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf, + bool wrapOptional); +extern "C" void ReadContainerNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, NKikimr::NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf, + bool wrapOptional); +void WriteYsonValueInTableFormat(NCommon::TOutputBuf& buf, NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, bool topLevel); +extern "C" void WriteYsonContainerValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, + const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf); + +void SkipSkiffField(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, NCommon::TInputBuf& buf); +NKikimr::NUdf::TUnboxedValue ReadSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, NCommon::TInputBuf& buf); +NKikimr::NUdf::TUnboxedValue ReadSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, NCommon::TInputBuf& buf); +void WriteSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf); +void WriteSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, + const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf); +extern "C" void WriteContainerNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, + const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf); + } // NYql diff --git a/yt/yql/providers/yt/codec/yt_codec_io.cpp b/yt/yql/providers/yt/codec/yt_codec_io.cpp index aa47453ea2..82b1a3e2f8 100644 --- a/yt/yql/providers/yt/codec/yt_codec_io.cpp +++ b/yt/yql/providers/yt/codec/yt_codec_io.cpp @@ -901,12 +901,12 @@ protected: return NUdf::TUnboxedValue(); } auto& decoder = *SpecsCache_.GetSpecs().Inputs[TableIndex_]; - auto val = ReadYsonValue((decoder.NativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) ? type : uwrappedType, decoder.NativeYtTypeFlags, SpecsCache_.GetHolderFactory(), cmd, Buf_, true); + auto val = ReadYsonValueInTableFormat((decoder.NativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) ? type : uwrappedType, decoder.NativeYtTypeFlags, SpecsCache_.GetHolderFactory(), cmd, Buf_); return (decoder.NativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) ? val : val.Release().MakeOptional(); } else { if (Y_LIKELY(cmd != EntitySymbol)) { auto& decoder = *SpecsCache_.GetSpecs().Inputs[TableIndex_]; - return ReadYsonValue(type, decoder.NativeYtTypeFlags, SpecsCache_.GetHolderFactory(), cmd, Buf_, true); + return ReadYsonValueInTableFormat(type, decoder.NativeYtTypeFlags, SpecsCache_.GetHolderFactory(), cmd, Buf_); } if (type->GetKind() == TType::EKind::Data && static_cast<TDataType*>(type)->GetSchemeType() == NUdf::TDataType<NUdf::TYson>::Id) { @@ -1367,7 +1367,7 @@ protected: } if (uwrappedType->IsData()) { - return NCommon::ReadSkiffData(uwrappedType, 0, Buf_); + return ReadSkiffData(uwrappedType, 0, Buf_); } else if (!isOptional && uwrappedType->IsPg()) { return NCommon::ReadSkiffPg(static_cast<TPgType*>(uwrappedType), Buf_); } else { @@ -1378,17 +1378,17 @@ protected: // parse binary yson... YQL_ENSURE(size > 0); char cmd = Buf_.Read(); - auto value = ReadYsonValue(uwrappedType, 0, SpecsCache_.GetHolderFactory(), cmd, Buf_, true); + auto value = ReadYsonValueInTableFormat(uwrappedType, 0, SpecsCache_.GetHolderFactory(), cmd, Buf_); return isOptional ? value.Release().MakeOptional() : value; } } NUdf::TUnboxedValue ReadSkiffFieldNativeYt(TType* type, ui64 nativeYtTypeFlags) { - return NCommon::ReadSkiffNativeYtValue(type, nativeYtTypeFlags, SpecsCache_.GetHolderFactory(), Buf_); + return ReadSkiffNativeYtValue(type, nativeYtTypeFlags, SpecsCache_.GetHolderFactory(), Buf_); } void SkipSkiffField(TType* type, ui64 nativeYtTypeFlags) { - return NCommon::SkipSkiffField(type, nativeYtTypeFlags, Buf_); + return ::NYql::SkipSkiffField(type, nativeYtTypeFlags, Buf_); } }; @@ -2065,9 +2065,9 @@ protected: void WriteSkiffValue(TType* type, const NUdf::TUnboxedValuePod& value, bool wasOptional) { if (NativeYtTypeFlags_) { - NCommon::WriteSkiffNativeYtValue(type, NativeYtTypeFlags_, value, Buf_); + WriteSkiffNativeYtValue(type, NativeYtTypeFlags_, value, Buf_); } else if (type->IsData()) { - NCommon::WriteSkiffData(type, 0, value, Buf_); + WriteSkiffData(type, 0, value, Buf_); } else if (!wasOptional && type->IsPg()) { NCommon::WriteSkiffPg(static_cast<TPgType*>(type), value, Buf_); } else { diff --git a/yt/yql/providers/yt/lib/res_pull/res_or_pull.cpp b/yt/yql/providers/yt/lib/res_pull/res_or_pull.cpp index d7800dc2d7..c901181ff5 100644 --- a/yt/yql/providers/yt/lib/res_pull/res_or_pull.cpp +++ b/yt/yql/providers/yt/lib/res_pull/res_or_pull.cpp @@ -197,7 +197,7 @@ bool TSkiffExecuteResOrPull::WriteNext(TMkqlIOCache& specsCache, const NYT::TNod YQL_ENSURE(rec.GetType() == NYT::TNode::EType::Map, "Expected map node"); TStringStream err; - auto value = NCommon::ParseYsonNode(specsCache.GetHolderFactory(), rec, Specs.Outputs[0].RowType, Specs.Outputs[0].NativeYtTypeFlags, &err); + auto value = ParseYsonNode(specsCache.GetHolderFactory(), rec, Specs.Outputs[0].RowType, Specs.Outputs[0].NativeYtTypeFlags, &err); if (!value) { throw yexception() << "Could not parse yson node with error: " << err.Str(); } |