aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@yandex-team.com>2025-02-13 22:36:29 +0300
committervvvv <vvvv@yandex-team.com>2025-02-13 22:53:27 +0300
commitdcc9572fc76d9c740c97de127bdee1e0b71273b0 (patch)
tree48e1e3c9e31e31a346e6905dcf4dc709dd6be802
parenta487ac9a1cd2053ec24449dd6c4a7e70c1cc2b8d (diff)
downloadydb-dcc9572fc76d9c740c97de127bdee1e0b71273b0.tar.gz
Refactor out codec for table format
init commit_hash:65402bd8880a077306c1ded09b6d1aa8ea03cd1e
-rw-r--r--yql/essentials/minikql/protobuf_udf/ut/value_builder_ut.cpp7
-rw-r--r--yql/essentials/minikql/protobuf_udf/ut/ya.make1
-rw-r--r--yql/essentials/providers/common/codec/ya.make1
-rw-r--r--yql/essentials/providers/common/codec/yql_codec.cpp1691
-rw-r--r--yql/essentials/providers/common/codec/yql_codec.h34
-rw-r--r--yt/yql/providers/yt/codec/codegen/ya.make.inc1
-rw-r--r--yt/yql/providers/yt/codec/codegen/yt_codec_cg.cpp7
-rw-r--r--yt/yql/providers/yt/codec/ya.make1
-rw-r--r--yt/yql/providers/yt/codec/yt_codec.cpp1918
-rw-r--r--yt/yql/providers/yt/codec/yt_codec.h26
-rw-r--r--yt/yql/providers/yt/codec/yt_codec_io.cpp16
-rw-r--r--yt/yql/providers/yt/lib/res_pull/res_or_pull.cpp2
12 files changed, 2056 insertions, 1649 deletions
diff --git a/yql/essentials/minikql/protobuf_udf/ut/value_builder_ut.cpp b/yql/essentials/minikql/protobuf_udf/ut/value_builder_ut.cpp
index 4d8a77dced..a3ed5f796b 100644
--- a/yql/essentials/minikql/protobuf_udf/ut/value_builder_ut.cpp
+++ b/yql/essentials/minikql/protobuf_udf/ut/value_builder_ut.cpp
@@ -5,6 +5,7 @@
#include <yql/essentials/providers/common/codec/yql_codec.h>
#include <yql/essentials/providers/common/codec/yql_codec_buf.h>
+#include <yt/yql/providers/yt/codec/yt_codec.h>
#include <yql/essentials/minikql/mkql_alloc.h>
#include <yql/essentials/minikql/mkql_node.h>
#include <yql/essentials/minikql/mkql_type_builder.h>
@@ -58,12 +59,12 @@ struct TSetup {
template <typename TProto>
TString YsonToProtoText(TSetup& setup, NUdf::TProtoInfo& info, TStringBuf yson) {
TStringStream err;
- auto val = NCommon::ParseYsonValue(
+ auto val = ParseYsonValueInTableFormat(
setup.HolderFactory,
NYT::NodeToYsonString(NYT::NodeFromYsonString(yson), ::NYson::EYsonFormat::Binary),
static_cast<NKikimr::NMiniKQL::TStructType*>(info.StructType),
0,
- &err, true);
+ &err);
if (!val) {
throw yexception() << err.Str();
}
@@ -120,7 +121,7 @@ TString ProtoTextToYson(TSetup& setup, NUdf::TProtoInfo& info, TStringBuf protoT
auto value = FillValueFromProto(proto, &setup.ValueBuilder, info);
TTestWriter out;
NCommon::TOutputBuf buf(out, nullptr);
- NCommon::WriteYsonValueInTableFormat(buf, static_cast<NKikimr::NMiniKQL::TStructType*>(info.StructType), 0, value, true);
+ WriteYsonValueInTableFormat(buf, static_cast<NKikimr::NMiniKQL::TStructType*>(info.StructType), 0, value, true);
buf.Finish();
return NYT::NodeToYsonString(NYT::NodeFromYsonString(out.Str()), ::NYson::EYsonFormat::Text);
diff --git a/yql/essentials/minikql/protobuf_udf/ut/ya.make b/yql/essentials/minikql/protobuf_udf/ut/ya.make
index dd4741870f..e788d5da8d 100644
--- a/yql/essentials/minikql/protobuf_udf/ut/ya.make
+++ b/yql/essentials/minikql/protobuf_udf/ut/ya.make
@@ -11,6 +11,7 @@ SRCS(
PEERDIR(
yt/yql/providers/yt/lib/schema
yt/yql/providers/yt/common
+ yt/yql/providers/yt/codec
yql/essentials/public/udf/service/exception_policy
yql/essentials/minikql
yql/essentials/public/udf
diff --git a/yql/essentials/providers/common/codec/ya.make b/yql/essentials/providers/common/codec/ya.make
index ca9d05dd2a..a55ef22bbf 100644
--- a/yql/essentials/providers/common/codec/ya.make
+++ b/yql/essentials/providers/common/codec/ya.make
@@ -19,7 +19,6 @@ PEERDIR(
library/cpp/yson
library/cpp/json
library/cpp/enumbitset
- yt/yt/library/decimal
)
YQL_LAST_ABI_VERSION()
diff --git a/yql/essentials/providers/common/codec/yql_codec.cpp b/yql/essentials/providers/common/codec/yql_codec.cpp
index 6710498276..04517c6e2e 100644
--- a/yql/essentials/providers/common/codec/yql_codec.cpp
+++ b/yql/essentials/providers/common/codec/yql_codec.cpp
@@ -23,8 +23,6 @@
#include <util/string/cast.h>
#include <util/generic/map.h>
-#include <yt/yt/library/decimal/decimal.h>
-
namespace NYql {
namespace NCommon {
@@ -289,7 +287,7 @@ TMaybe<TVector<ui32>> CreateStructPositions(TType* inputType, const TVector<TStr
if (inputType->GetKind() != TType::EKind::Struct) {
return Nothing();
}
-
+
auto inputStruct = AS_TYPE(TStructType, inputType);
TMap<TStringBuf, ui32> members;
TVector<ui32> structPositions(inputStruct->GetMembersCount(), Max<ui32>());
@@ -784,119 +782,66 @@ T ReadNextSerializedNumber(char cmd, TInputBuf& buf) {
}
template <typename T>
-T ReadYsonFloatNumber(char cmd, TInputBuf& buf, bool isTableFormat) {
- if (isTableFormat) {
- CHECK_EXPECTED(cmd, DoubleMarker);
- double dbl;
- buf.ReadMany((char*)&dbl, sizeof(dbl));
- return dbl;
- }
-
+T ReadYsonFloatNumber(char cmd, TInputBuf& buf) {
return ReadNextSerializedNumber<T>(cmd, buf);
}
-NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
- const NKikimr::NMiniKQL::THolderFactory& holderFactory, char cmd, TInputBuf& buf, bool isTableFormat) {
+NUdf::TUnboxedValue ReadYsonValue(TType* type,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory, char cmd, TInputBuf& buf) {
switch (type->GetKind()) {
case TType::EKind::Variant: {
auto varType = static_cast<TVariantType*>(type);
auto underlyingType = varType->GetUnderlyingType();
- if (isTableFormat && (nativeYtTypeFlags & NTCF_COMPLEX)) {
- CHECK_EXPECTED(cmd, BeginListSymbol);
- cmd = buf.Read();
- TType* type = nullptr;
- i64 index = 0;
- if (cmd == StringMarker) {
- YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type");
- auto structType = static_cast<TStructType*>(underlyingType);
- auto nameBuffer = ReadNextString(cmd, buf);
- auto foundIndex = structType->FindMemberIndex(nameBuffer);
- YQL_ENSURE(foundIndex.Defined(), "Unexpected member: " << nameBuffer);
- index = *foundIndex;
- type = varType->GetAlternativeType(index);
- } else {
- YQL_ENSURE(cmd == Int64Marker || cmd == Uint64Marker);
- YQL_ENSURE(underlyingType->IsTuple(), "Expected tuple as underlying type");
- if (cmd == Uint64Marker) {
- index = buf.ReadVarUI64();
- } else {
- index = buf.ReadVarI64();
- }
- YQL_ENSURE(0 <= index && index < varType->GetAlternativesCount(), "Unexpected member index: " << index);
- type = varType->GetAlternativeType(index);
- }
- cmd = buf.Read();
- CHECK_EXPECTED(cmd, ListItemSeparatorSymbol);
- cmd = buf.Read();
- auto value = ReadYsonValue(type, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
- cmd = buf.Read();
- if (cmd != EndListSymbol) {
- CHECK_EXPECTED(cmd, ListItemSeparatorSymbol);
- cmd = buf.Read();
- CHECK_EXPECTED(cmd, EndListSymbol);
- }
- return holderFactory.CreateVariantHolder(value.Release(), index);
- } else {
- if (cmd == StringMarker) {
- YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type");
- auto name = ReadNextString(cmd, buf);
- auto index = static_cast<TStructType*>(underlyingType)->FindMemberIndex(name);
- YQL_ENSURE(index, "Unexpected member: " << name);
- YQL_ENSURE(static_cast<TStructType*>(underlyingType)->GetMemberType(*index)->IsVoid(), "Expected Void as underlying type");
- return holderFactory.CreateVariantHolder(NUdf::TUnboxedValuePod::Zero(), *index);
- }
-
- CHECK_EXPECTED(cmd, BeginListSymbol);
- cmd = buf.Read();
- i64 index = 0;
- if (isTableFormat) {
- YQL_ENSURE(cmd == Int64Marker || cmd == Uint64Marker);
- if (cmd == Uint64Marker) {
- index = buf.ReadVarUI64();
- } else {
- index = buf.ReadVarI64();
- }
- } else {
- if (cmd == BeginListSymbol) {
- cmd = buf.Read();
- YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type");
- auto name = ReadNextString(cmd, buf);
- auto foundIndex = static_cast<TStructType*>(underlyingType)->FindMemberIndex(name);
- YQL_ENSURE(foundIndex, "Unexpected member: " << name);
- index = *foundIndex;
- cmd = buf.Read();
- if (cmd == ListItemSeparatorSymbol) {
- cmd = buf.Read();
- }
-
- CHECK_EXPECTED(cmd, EndListSymbol);
- } else {
- index = ReadNextSerializedNumber<ui64>(cmd, buf);
- }
- }
-
- YQL_ENSURE(index < varType->GetAlternativesCount(), "Bad variant alternative: " << index << ", only " <<
- varType->GetAlternativesCount() << " are available");
- YQL_ENSURE(underlyingType->IsTuple() || underlyingType->IsStruct(), "Wrong underlying type");
- TType* itemType;
- if (underlyingType->IsTuple()) {
- itemType = static_cast<TTupleType*>(underlyingType)->GetElementType(index);
- }
- else {
- itemType = static_cast<TStructType*>(underlyingType)->GetMemberType(index);
- }
+ if (cmd == StringMarker) {
+ YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type");
+ auto name = ReadNextString(cmd, buf);
+ auto index = static_cast<TStructType*>(underlyingType)->FindMemberIndex(name);
+ YQL_ENSURE(index, "Unexpected member: " << name);
+ YQL_ENSURE(static_cast<TStructType*>(underlyingType)->GetMemberType(*index)->IsVoid(), "Expected Void as underlying type");
+ return holderFactory.CreateVariantHolder(NUdf::TUnboxedValuePod::Zero(), *index);
+ }
- EXPECTED(buf, ListItemSeparatorSymbol);
+ CHECK_EXPECTED(cmd, BeginListSymbol);
+ cmd = buf.Read();
+ i64 index = 0;
+ if (cmd == BeginListSymbol) {
cmd = buf.Read();
- auto value = ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
+ YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type");
+ auto name = ReadNextString(cmd, buf);
+ auto foundIndex = static_cast<TStructType*>(underlyingType)->FindMemberIndex(name);
+ YQL_ENSURE(foundIndex, "Unexpected member: " << name);
+ index = *foundIndex;
cmd = buf.Read();
if (cmd == ListItemSeparatorSymbol) {
cmd = buf.Read();
}
CHECK_EXPECTED(cmd, EndListSymbol);
- return holderFactory.CreateVariantHolder(value.Release(), index);
+ } else {
+ index = ReadNextSerializedNumber<ui64>(cmd, buf);
}
+
+ YQL_ENSURE(index < varType->GetAlternativesCount(), "Bad variant alternative: " << index << ", only " <<
+ varType->GetAlternativesCount() << " are available");
+ YQL_ENSURE(underlyingType->IsTuple() || underlyingType->IsStruct(), "Wrong underlying type");
+ TType* itemType;
+ if (underlyingType->IsTuple()) {
+ itemType = static_cast<TTupleType*>(underlyingType)->GetElementType(index);
+ }
+ else {
+ itemType = static_cast<TStructType*>(underlyingType)->GetMemberType(index);
+ }
+
+ EXPECTED(buf, ListItemSeparatorSymbol);
+ cmd = buf.Read();
+ auto value = ReadYsonValue(itemType, holderFactory, cmd, buf);
+ cmd = buf.Read();
+ if (cmd == ListItemSeparatorSymbol) {
+ cmd = buf.Read();
+ }
+
+ CHECK_EXPECTED(cmd, EndListSymbol);
+ return holderFactory.CreateVariantHolder(value.Release(), index);
}
case TType::EKind::Data: {
@@ -907,117 +852,49 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
return NUdf::TUnboxedValuePod(cmd == TrueMarker);
case NUdf::TDataType<ui8>::Id:
- if (isTableFormat) {
- CHECK_EXPECTED(cmd, Uint64Marker);
- return NUdf::TUnboxedValuePod(ui8(buf.ReadVarUI64()));
- }
return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<ui8>(cmd, buf));
case NUdf::TDataType<i8>::Id:
- if (isTableFormat) {
- CHECK_EXPECTED(cmd, Int64Marker);
- return NUdf::TUnboxedValuePod(i8(buf.ReadVarI64()));
- }
return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<i8>(cmd, buf));
case NUdf::TDataType<ui16>::Id:
- if (isTableFormat) {
- CHECK_EXPECTED(cmd, Uint64Marker);
- return NUdf::TUnboxedValuePod(ui16(buf.ReadVarUI64()));
- }
return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<ui16>(cmd, buf));
case NUdf::TDataType<i16>::Id:
- if (isTableFormat) {
- CHECK_EXPECTED(cmd, Int64Marker);
- return NUdf::TUnboxedValuePod(i16(buf.ReadVarI64()));
- }
return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<i16>(cmd, buf));
case NUdf::TDataType<i32>::Id:
- if (isTableFormat) {
- CHECK_EXPECTED(cmd, Int64Marker);
- return NUdf::TUnboxedValuePod(i32(buf.ReadVarI64()));
- }
return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<i32>(cmd, buf));
case NUdf::TDataType<ui32>::Id:
- if (isTableFormat) {
- CHECK_EXPECTED(cmd, Uint64Marker);
- return NUdf::TUnboxedValuePod(ui32(buf.ReadVarUI64()));
- }
return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<ui32>(cmd, buf));
case NUdf::TDataType<i64>::Id:
- if (isTableFormat) {
- CHECK_EXPECTED(cmd, Int64Marker);
- return NUdf::TUnboxedValuePod(buf.ReadVarI64());
- }
return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<i64>(cmd, buf));
case NUdf::TDataType<ui64>::Id:
- if (isTableFormat) {
- CHECK_EXPECTED(cmd, Uint64Marker);
- return NUdf::TUnboxedValuePod(buf.ReadVarUI64());
- }
return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<ui64>(cmd, buf));
case NUdf::TDataType<float>::Id:
- return NUdf::TUnboxedValuePod(ReadYsonFloatNumber<float>(cmd, buf, isTableFormat));
+ return NUdf::TUnboxedValuePod(ReadYsonFloatNumber<float>(cmd, buf));
case NUdf::TDataType<double>::Id:
- return NUdf::TUnboxedValuePod(ReadYsonFloatNumber<double>(cmd, buf, isTableFormat));
+ return NUdf::TUnboxedValuePod(ReadYsonFloatNumber<double>(cmd, buf));
case NUdf::TDataType<NUdf::TUtf8>::Id:
case NUdf::TDataType<char*>::Id:
case NUdf::TDataType<NUdf::TJson>::Id:
case NUdf::TDataType<NUdf::TDyNumber>::Id:
case NUdf::TDataType<NUdf::TUuid>::Id: {
- if (isTableFormat) {
- auto nextString = ReadNextString(cmd, buf);
- return NUdf::TUnboxedValue(MakeString(NUdf::TStringRef(nextString)));
- }
-
return ReadYsonStringInResultFormat(cmd, buf);
}
case NUdf::TDataType<NUdf::TDecimal>::Id: {
auto nextString = ReadNextString(cmd, buf);
- if (isTableFormat) {
- if (nativeYtTypeFlags & NTCF_DECIMAL) {
- auto const params = static_cast<TDataDecimalType*>(type)->GetParams();
- if (params.first < 10) {
- // The YQL format differs from the YT format in the inf/nan values. NDecimal::FromYtDecimal converts nan/inf
- NDecimal::TInt128 res = NDecimal::FromYtDecimal(NYT::NDecimal::TDecimal::ParseBinary32(params.first, nextString));
- YQL_ENSURE(!NDecimal::IsError(res));
- return NUdf::TUnboxedValuePod(res);
- } else if (params.first < 19) {
- NDecimal::TInt128 res = NDecimal::FromYtDecimal(NYT::NDecimal::TDecimal::ParseBinary64(params.first, nextString));
- YQL_ENSURE(!NDecimal::IsError(res));
- return NUdf::TUnboxedValuePod(res);
- } else {
- YQL_ENSURE(params.first < 36);
- NYT::NDecimal::TDecimal::TValue128 tmpRes = NYT::NDecimal::TDecimal::ParseBinary128(params.first, nextString);
- NDecimal::TInt128 res;
- static_assert(sizeof(NDecimal::TInt128) == sizeof(NYT::NDecimal::TDecimal::TValue128));
- memcpy(&res, &tmpRes, sizeof(NDecimal::TInt128));
- res = NDecimal::FromYtDecimal(res);
- YQL_ENSURE(!NDecimal::IsError(res));
- return NUdf::TUnboxedValuePod(res);
- }
- }
- else {
- const auto& des = NDecimal::Deserialize(nextString.data(), nextString.size());
- YQL_ENSURE(!NDecimal::IsError(des.first));
- YQL_ENSURE(nextString.size() == des.second);
- return NUdf::TUnboxedValuePod(des.first);
- }
- } else {
- const auto params = static_cast<TDataDecimalType*>(type)->GetParams();
- const auto val = NDecimal::FromString(nextString, params.first, params.second);
- YQL_ENSURE(!NDecimal::IsError(val));
- return NUdf::TUnboxedValuePod(val);
- }
+ const auto params = static_cast<TDataDecimalType*>(type)->GetParams();
+ const auto val = NDecimal::FromString(nextString, params.first, params.second);
+ YQL_ENSURE(!NDecimal::IsError(val));
+ return NUdf::TUnboxedValuePod(val);
}
case NUdf::TDataType<NUdf::TYson>::Id: {
@@ -1025,114 +902,55 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
yson.clear();
CopyYsonWithAttrs(cmd, buf, yson);
- if (isTableFormat) {
- return NUdf::TUnboxedValue(MakeString(NUdf::TStringRef(yson)));
- }
-
TString decodedYson = NResult::DecodeRestrictedYson(TStringBuf(yson.data(), yson.size()), NYson::EYsonFormat::Text);
return NUdf::TUnboxedValue(MakeString(NUdf::TStringRef(decodedYson)));
}
case NUdf::TDataType<NUdf::TDate>::Id:
- if (isTableFormat) {
- CHECK_EXPECTED(cmd, Uint64Marker);
- return NUdf::TUnboxedValuePod((ui16)buf.ReadVarUI64());
- }
return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<ui16>(cmd, buf));
case NUdf::TDataType<NUdf::TDatetime>::Id:
- if (isTableFormat) {
- CHECK_EXPECTED(cmd, Uint64Marker);
- return NUdf::TUnboxedValuePod((ui32)buf.ReadVarUI64());
- }
return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<ui32>(cmd, buf));
case NUdf::TDataType<NUdf::TTimestamp>::Id:
- if (isTableFormat) {
- CHECK_EXPECTED(cmd, Uint64Marker);
- return NUdf::TUnboxedValuePod(buf.ReadVarUI64());
- }
return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<ui64>(cmd, buf));
case NUdf::TDataType<NUdf::TInterval>::Id:
- if (isTableFormat) {
- CHECK_EXPECTED(cmd, Int64Marker);
- return NUdf::TUnboxedValuePod(buf.ReadVarI64());
- }
return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<i64>(cmd, buf));
case NUdf::TDataType<NUdf::TTzDate>::Id: {
auto nextString = ReadNextString(cmd, buf);
NUdf::TUnboxedValuePod data;
- if (isTableFormat) {
- ui16 value;
- ui16 tzId = 0;
- YQL_ENSURE(DeserializeTzDate(nextString, value, tzId));
- data = NUdf::TUnboxedValuePod(value);
- data.SetTimezoneId(tzId);
- } else {
- data = ValueFromString(NUdf::EDataSlot::TzDate, nextString);
- YQL_ENSURE(data, "incorrect tz date format for value " << nextString);
- }
-
+ data = ValueFromString(NUdf::EDataSlot::TzDate, nextString);
+ YQL_ENSURE(data, "incorrect tz date format for value " << nextString);
return data;
}
case NUdf::TDataType<NUdf::TTzDatetime>::Id: {
auto nextString = ReadNextString(cmd, buf);
NUdf::TUnboxedValuePod data;
- if (isTableFormat) {
- ui32 value;
- ui16 tzId = 0;
- YQL_ENSURE(DeserializeTzDatetime(nextString, value, tzId));
- data = NUdf::TUnboxedValuePod(value);
- data.SetTimezoneId(tzId);
- } else {
- data = ValueFromString(NUdf::EDataSlot::TzDatetime, nextString);
- YQL_ENSURE(data, "incorrect tz datetime format for value " << nextString);
- }
-
+ data = ValueFromString(NUdf::EDataSlot::TzDatetime, nextString);
+ YQL_ENSURE(data, "incorrect tz datetime format for value " << nextString);
return data;
}
case NUdf::TDataType<NUdf::TTzTimestamp>::Id: {
auto nextString = ReadNextString(cmd, buf);
NUdf::TUnboxedValuePod data;
- if (isTableFormat) {
- ui64 value;
- ui16 tzId = 0;
- YQL_ENSURE(DeserializeTzTimestamp(nextString, value, tzId));
- data = NUdf::TUnboxedValuePod(value);
- data.SetTimezoneId(tzId);
- } else {
- data = ValueFromString(NUdf::EDataSlot::TzTimestamp, nextString);
- YQL_ENSURE(data, "incorrect tz timestamp format for value " << nextString);
- }
-
+ data = ValueFromString(NUdf::EDataSlot::TzTimestamp, nextString);
+ YQL_ENSURE(data, "incorrect tz timestamp format for value " << nextString);
return data;
}
case NUdf::TDataType<NUdf::TDate32>::Id:
- if (isTableFormat) {
- CHECK_EXPECTED(cmd, Int64Marker);
- return NUdf::TUnboxedValuePod((i32)buf.ReadVarI64());
- }
return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<i32>(cmd, buf));
case NUdf::TDataType<NUdf::TDatetime64>::Id:
case NUdf::TDataType<NUdf::TTimestamp64>::Id:
case NUdf::TDataType<NUdf::TInterval64>::Id:
- if (isTableFormat) {
- CHECK_EXPECTED(cmd, Int64Marker);
- return NUdf::TUnboxedValuePod(buf.ReadVarI64());
- }
return NUdf::TUnboxedValuePod(ReadNextSerializedNumber<i64>(cmd, buf));
case NUdf::TDataType<NUdf::TJsonDocument>::Id: {
- if (isTableFormat) {
- return ValueFromString(EDataSlot::JsonDocument, ReadNextString(cmd, buf));
- }
-
const auto json = ReadYsonStringInResultFormat(cmd, buf);
return ValueFromString(EDataSlot::JsonDocument, json.AsStringRef());
}
@@ -1140,51 +958,24 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
case NUdf::TDataType<NUdf::TTzDate32>::Id: {
auto nextString = ReadNextString(cmd, buf);
NUdf::TUnboxedValuePod data;
- if (isTableFormat) {
- i32 value;
- ui16 tzId = 0;
- YQL_ENSURE(DeserializeTzDate32(nextString, value, tzId));
- data = NUdf::TUnboxedValuePod(value);
- data.SetTimezoneId(tzId);
- } else {
- data = ValueFromString(NUdf::EDataSlot::TzDate32, nextString);
- YQL_ENSURE(data, "incorrect tz date format for value " << nextString);
- }
-
+ data = ValueFromString(NUdf::EDataSlot::TzDate32, nextString);
+ YQL_ENSURE(data, "incorrect tz date format for value " << nextString);
return data;
}
case NUdf::TDataType<NUdf::TTzDatetime64>::Id: {
auto nextString = ReadNextString(cmd, buf);
NUdf::TUnboxedValuePod data;
- if (isTableFormat) {
- i64 value;
- ui16 tzId = 0;
- YQL_ENSURE(DeserializeTzDatetime64(nextString, value, tzId));
- data = NUdf::TUnboxedValuePod(value);
- data.SetTimezoneId(tzId);
- } else {
- data = ValueFromString(NUdf::EDataSlot::TzDatetime64, nextString);
- YQL_ENSURE(data, "incorrect tz datetime format for value " << nextString);
- }
-
+ data = ValueFromString(NUdf::EDataSlot::TzDatetime64, nextString);
+ YQL_ENSURE(data, "incorrect tz datetime format for value " << nextString);
return data;
}
case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: {
auto nextString = ReadNextString(cmd, buf);
NUdf::TUnboxedValuePod data;
- if (isTableFormat) {
- i64 value;
- ui16 tzId = 0;
- YQL_ENSURE(DeserializeTzTimestamp64(nextString, value, tzId));
- data = NUdf::TUnboxedValuePod(value);
- data.SetTimezoneId(tzId);
- } else {
- data = ValueFromString(NUdf::EDataSlot::TzTimestamp64, nextString);
- YQL_ENSURE(data, "incorrect tz timestamp format for value " << nextString);
- }
-
+ data = ValueFromString(NUdf::EDataSlot::TzTimestamp64, nextString);
+ YQL_ENSURE(data, "incorrect tz timestamp format for value " << nextString);
return data;
}
@@ -1202,7 +993,7 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
cmd = buf.Read();
for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
- items[i] = ReadYsonValue(structType->GetMemberType(i), nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
+ items[i] = ReadYsonValue(structType->GetMemberType(i), holderFactory, cmd, buf);
cmd = buf.Read();
if (cmd == ListItemSeparatorSymbol) {
@@ -1227,11 +1018,7 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
if (pos && cmd != '#') {
auto memberType = structType->GetMemberType(*pos);
auto unwrappedType = memberType;
- if (!(nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) && isTableFormat && unwrappedType->IsOptional()) {
- unwrappedType = static_cast<TOptionalType*>(unwrappedType)->GetItemType();
- }
-
- items[*pos] = ReadYsonValue(unwrappedType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
+ items[*pos] = ReadYsonValue(unwrappedType, holderFactory, cmd, buf);
} else {
SkipYson(cmd, buf);
}
@@ -1265,7 +1052,7 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
break;
}
- items = items.Append(ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat));
+ items = items.Append(ReadYsonValue(itemType, holderFactory, cmd, buf));
cmd = buf.Read();
if (cmd == ListItemSeparatorSymbol) {
@@ -1281,40 +1068,24 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
return NUdf::TUnboxedValuePod();
}
auto itemType = static_cast<TOptionalType*>(type)->GetItemType();
- if (isTableFormat && (nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX)) {
- if (itemType->GetKind() == TType::EKind::Optional || itemType->GetKind() == TType::EKind::Pg) {
- CHECK_EXPECTED(cmd, BeginListSymbol);
- cmd = buf.Read();
- auto value = ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
- cmd = buf.Read();
- if (cmd == ListItemSeparatorSymbol) {
- cmd = buf.Read();
- }
- CHECK_EXPECTED(cmd, EndListSymbol);
- return value.Release().MakeOptional();
- } else {
- return ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat).Release().MakeOptional();
- }
- } else {
- if (cmd != BeginListSymbol) {
- auto value = ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
- return value.Release().MakeOptional();
- }
+ if (cmd != BeginListSymbol) {
+ auto value = ReadYsonValue(itemType, holderFactory, cmd, buf);
+ return value.Release().MakeOptional();
+ }
- cmd = buf.Read();
- if (cmd == EndListSymbol) {
- return NUdf::TUnboxedValuePod();
- }
+ cmd = buf.Read();
+ if (cmd == EndListSymbol) {
+ return NUdf::TUnboxedValuePod();
+ }
- auto value = ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
+ auto value = ReadYsonValue(itemType, holderFactory, cmd, buf);
+ cmd = buf.Read();
+ if (cmd == ListItemSeparatorSymbol) {
cmd = buf.Read();
- if (cmd == ListItemSeparatorSymbol) {
- cmd = buf.Read();
- }
-
- CHECK_EXPECTED(cmd, EndListSymbol);
- return value.Release().MakeOptional();
}
+
+ CHECK_EXPECTED(cmd, EndListSymbol);
+ return value.Release().MakeOptional();
}
case TType::EKind::Dict: {
@@ -1353,7 +1124,7 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
auto keyStr = NUdf::TUnboxedValue(MakeString(keyBuffer));
EXPECTED(buf, KeyValueSeparatorSymbol);
cmd = buf.Read();
- auto payload = ReadYsonValue(payloadType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
+ auto payload = ReadYsonValue(payloadType, holderFactory, cmd, buf);
map.emplace(std::move(keyStr), std::move(payload));
cmd = buf.Read();
@@ -1378,10 +1149,10 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
CHECK_EXPECTED(cmd, BeginListSymbol);
cmd = buf.Read();
- auto key = ReadYsonValue(keyType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
+ auto key = ReadYsonValue(keyType, holderFactory, cmd, buf);
EXPECTED(buf, ListItemSeparatorSymbol);
cmd = buf.Read();
- auto payload = ReadYsonValue(payloadType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
+ auto payload = ReadYsonValue(payloadType, holderFactory, cmd, buf);
cmd = buf.Read();
if (cmd == ListItemSeparatorSymbol) {
cmd = buf.Read();
@@ -1416,7 +1187,7 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
cmd = buf.Read();
for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- items[i] = ReadYsonValue(tupleType->GetElementType(i), nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
+ items[i] = ReadYsonValue(tupleType->GetElementType(i), holderFactory, cmd, buf);
cmd = buf.Read();
if (cmd == ListItemSeparatorSymbol) {
@@ -1466,12 +1237,12 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
case TType::EKind::Pg: {
auto pgType = static_cast<TPgType*>(type);
- return isTableFormat ? ReadYsonValueInTableFormatPg(pgType, cmd, buf) : ReadYsonValuePg(pgType, cmd, buf);
+ return ReadYsonValuePg(pgType, cmd, buf);
}
case TType::EKind::Tagged: {
auto taggedType = static_cast<TTaggedType*>(type);
- return ReadYsonValue(taggedType->GetBaseType(), nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
+ return ReadYsonValue(taggedType->GetBaseType(), holderFactory, cmd, buf);
}
default:
@@ -1480,7 +1251,7 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
}
TMaybe<NUdf::TUnboxedValue> ParseYsonValue(const THolderFactory& holderFactory,
- const TStringBuf& yson, TType* type, ui64 nativeYtTypeFlags, IOutputStream* err, bool isTableFormat) {
+ const TStringBuf& yson, TType* type, IOutputStream* err) {
try {
class TReader : public IBlockReader {
public:
@@ -1520,7 +1291,7 @@ TMaybe<NUdf::TUnboxedValue> ParseYsonValue(const THolderFactory& holderFactory,
TReader reader(yson);
TInputBuf buf(reader, nullptr);
char cmd = buf.Read();
- return ReadYsonValue(type, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
+ return ReadYsonValue(type, holderFactory, cmd, buf);
}
catch (const yexception& e) {
if (err) {
@@ -1530,1293 +1301,9 @@ TMaybe<NUdf::TUnboxedValue> ParseYsonValue(const THolderFactory& holderFactory,
}
}
-TMaybe<NUdf::TUnboxedValue> ParseYsonNode(const THolderFactory& holderFactory,
- const NYT::TNode& node, TType* type, ui64 nativeYtTypeFlags, IOutputStream* err) {
- return ParseYsonValue(holderFactory, NYT::NodeToYsonString(node, NYson::EYsonFormat::Binary), type, nativeYtTypeFlags, err, true);
-}
-
TMaybe<NUdf::TUnboxedValue> ParseYsonNodeInResultFormat(const THolderFactory& holderFactory,
const NYT::TNode& node, TType* type, IOutputStream* err) {
- return ParseYsonValue(holderFactory, NYT::NodeToYsonString(node, NYson::EYsonFormat::Binary), type, 0, err, false);
-}
-
-extern "C" void ReadYsonContainerValue(TType* type, ui64 nativeYtTypeFlags, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
- NUdf::TUnboxedValue& value, TInputBuf& buf, bool wrapOptional) {
- // yson content
- ui32 size;
- buf.ReadMany((char*)&size, sizeof(size));
- CHECK_STRING_LENGTH_UNSIGNED(size);
- // parse binary yson...
- YQL_ENSURE(size > 0);
- char cmd = buf.Read();
- auto tmp = ReadYsonValue(type, nativeYtTypeFlags, holderFactory, cmd, buf, true);
- if (!wrapOptional) {
- value = std::move(tmp);
- }
- else {
- value = tmp.Release().MakeOptional();
- }
-}
-
-NUdf::TUnboxedValue ReadSkiffData(TType* type, ui64 nativeYtTypeFlags, TInputBuf& buf) {
- auto schemeType = static_cast<TDataType*>(type)->GetSchemeType();
- switch (schemeType) {
- case NUdf::TDataType<bool>::Id: {
- ui8 data;
- buf.ReadMany((char*)&data, sizeof(data));
- return NUdf::TUnboxedValuePod(data != 0);
- }
-
- case NUdf::TDataType<ui8>::Id: {
- ui64 data;
- buf.ReadMany((char*)&data, sizeof(data));
- return NUdf::TUnboxedValuePod(ui8(data));
- }
-
- case NUdf::TDataType<i8>::Id: {
- i64 data;
- buf.ReadMany((char*)&data, sizeof(data));
- return NUdf::TUnboxedValuePod(i8(data));
- }
-
- case NUdf::TDataType<NUdf::TDate>::Id:
- case NUdf::TDataType<ui16>::Id: {
- ui64 data;
- buf.ReadMany((char*)&data, sizeof(data));
- return NUdf::TUnboxedValuePod(ui16(data));
- }
-
- case NUdf::TDataType<i16>::Id: {
- i64 data;
- buf.ReadMany((char*)&data, sizeof(data));
- return NUdf::TUnboxedValuePod(i16(data));
- }
-
- case NUdf::TDataType<NUdf::TDate32>::Id:
- case NUdf::TDataType<i32>::Id: {
- i64 data;
- buf.ReadMany((char*)&data, sizeof(data));
- return NUdf::TUnboxedValuePod(i32(data));
- }
-
- case NUdf::TDataType<NUdf::TDatetime>::Id:
- case NUdf::TDataType<ui32>::Id: {
- ui64 data;
- buf.ReadMany((char*)&data, sizeof(data));
- return NUdf::TUnboxedValuePod(ui32(data));
- }
-
- case NUdf::TDataType<NUdf::TInterval>::Id:
- case NUdf::TDataType<NUdf::TInterval64>::Id:
- case NUdf::TDataType<NUdf::TDatetime64>::Id:
- case NUdf::TDataType<NUdf::TTimestamp64>::Id:
- case NUdf::TDataType<i64>::Id: {
- i64 data;
- buf.ReadMany((char*)&data, sizeof(data));
- return NUdf::TUnboxedValuePod(data);
- }
-
- case NUdf::TDataType<NUdf::TTimestamp>::Id:
- case NUdf::TDataType<ui64>::Id: {
- ui64 data;
- buf.ReadMany((char*)&data, sizeof(data));
- return NUdf::TUnboxedValuePod(data);
- }
-
- case NUdf::TDataType<float>::Id: {
- double data;
- buf.ReadMany((char*)&data, sizeof(data));
- return NUdf::TUnboxedValuePod(float(data));
- }
-
- case NUdf::TDataType<double>::Id: {
- double data;
- buf.ReadMany((char*)&data, sizeof(data));
- return NUdf::TUnboxedValuePod(data);
- }
-
- case NUdf::TDataType<NUdf::TUtf8>::Id:
- case NUdf::TDataType<char*>::Id:
- case NUdf::TDataType<NUdf::TJson>::Id:
- case NUdf::TDataType<NUdf::TYson>::Id:
- case NUdf::TDataType<NUdf::TDyNumber>::Id:
- case NUdf::TDataType<NUdf::TUuid>::Id: {
- ui32 size;
- buf.ReadMany((char*)&size, sizeof(size));
- CHECK_STRING_LENGTH_UNSIGNED(size);
- auto str = NUdf::TUnboxedValue(MakeStringNotFilled(size));
- buf.ReadMany(str.AsStringRef().Data(), size);
- return str;
- }
-
- case NUdf::TDataType<NUdf::TDecimal>::Id: {
- if (nativeYtTypeFlags & NTCF_DECIMAL) {
- auto const params = static_cast<TDataDecimalType*>(type)->GetParams();
- if (params.first < 10) {
- i32 data;
- buf.ReadMany((char*)&data, sizeof(data));
- return NUdf::TUnboxedValuePod(NDecimal::FromYtDecimal(data));
- } else if (params.first < 19) {
- i64 data;
- buf.ReadMany((char*)&data, sizeof(data));
- return NUdf::TUnboxedValuePod(NDecimal::FromYtDecimal(data));
- } else {
- YQL_ENSURE(params.first < 36);
- NDecimal::TInt128 data;
- buf.ReadMany((char*)&data, sizeof(data));
- return NUdf::TUnboxedValuePod(NDecimal::FromYtDecimal(data));
- }
- } else {
- ui32 size;
- buf.ReadMany(reinterpret_cast<char*>(&size), sizeof(size));
- const auto maxSize = sizeof(NDecimal::TInt128);
- YQL_ENSURE(size > 0U && size <= maxSize, "Bad decimal field size: " << size);
- char data[maxSize];
- buf.ReadMany(data, size);
- const auto& v = NDecimal::Deserialize(data, size);
- YQL_ENSURE(!NDecimal::IsError(v.first), "Bad decimal field data: " << data);
- YQL_ENSURE(size == v.second, "Bad decimal field size: " << size);
- return NUdf::TUnboxedValuePod(v.first);
- }
- }
-
- case NUdf::TDataType<NUdf::TTzDate>::Id: {
- ui32 size;
- buf.ReadMany((char*)&size, sizeof(size));
- CHECK_STRING_LENGTH_UNSIGNED(size);
- auto& vec = buf.YsonBuffer();
- vec.resize(size);
- buf.ReadMany(vec.data(), size);
- ui16 value;
- ui16 tzId;
- YQL_ENSURE(DeserializeTzDate(TStringBuf(vec.begin(), vec.end()), value, tzId));
- auto data = NUdf::TUnboxedValuePod(value);
- data.SetTimezoneId(tzId);
- return data;
- }
-
- case NUdf::TDataType<NUdf::TTzDatetime>::Id: {
- ui32 size;
- buf.ReadMany((char*)&size, sizeof(size));
- CHECK_STRING_LENGTH_UNSIGNED(size);
- auto& vec = buf.YsonBuffer();
- vec.resize(size);
- buf.ReadMany(vec.data(), size);
- ui32 value;
- ui16 tzId;
- YQL_ENSURE(DeserializeTzDatetime(TStringBuf(vec.begin(), vec.end()), value, tzId));
- auto data = NUdf::TUnboxedValuePod(value);
- data.SetTimezoneId(tzId);
- return data;
- }
-
- case NUdf::TDataType<NUdf::TTzTimestamp>::Id: {
- ui32 size;
- buf.ReadMany((char*)&size, sizeof(size));
- CHECK_STRING_LENGTH_UNSIGNED(size);
- auto& vec = buf.YsonBuffer();
- vec.resize(size);
- buf.ReadMany(vec.data(), size);
- ui64 value;
- ui16 tzId;
- YQL_ENSURE(DeserializeTzTimestamp(TStringBuf(vec.begin(), vec.end()), value, tzId));
- auto data = NUdf::TUnboxedValuePod(value);
- data.SetTimezoneId(tzId);
- return data;
- }
-
- case NUdf::TDataType<NUdf::TJsonDocument>::Id: {
- ui32 size;
- buf.ReadMany((char*)&size, sizeof(size));
- CHECK_STRING_LENGTH_UNSIGNED(size);
- auto json = NUdf::TUnboxedValue(MakeStringNotFilled(size));
- buf.ReadMany(json.AsStringRef().Data(), size);
- return ValueFromString(EDataSlot::JsonDocument, json.AsStringRef());
- }
-
- default:
- YQL_ENSURE(false, "Unsupported data type: " << schemeType);
- }
-}
-
-void SkipSkiffField(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, TInputBuf& buf) {
- const bool isOptional = type->IsOptional();
- if (isOptional) {
- // Unwrap optional
- type = static_cast<TOptionalType*>(type)->GetItemType();
- }
-
- if (isOptional) {
- auto marker = buf.Read();
- if (!marker) {
- return;
- }
- }
-
- if (type->IsData()) {
- auto schemeType = static_cast<TDataType*>(type)->GetSchemeType();
- switch (schemeType) {
- case NUdf::TDataType<bool>::Id:
- buf.SkipMany(sizeof(ui8));
- break;
-
- case NUdf::TDataType<ui8>::Id:
- case NUdf::TDataType<ui16>::Id:
- case NUdf::TDataType<ui32>::Id:
- case NUdf::TDataType<ui64>::Id:
- case NUdf::TDataType<NUdf::TDate>::Id:
- case NUdf::TDataType<NUdf::TDatetime>::Id:
- case NUdf::TDataType<NUdf::TTimestamp>::Id:
- buf.SkipMany(sizeof(ui64));
- break;
-
- case NUdf::TDataType<i8>::Id:
- case NUdf::TDataType<i16>::Id:
- case NUdf::TDataType<i32>::Id:
- case NUdf::TDataType<i64>::Id:
- case NUdf::TDataType<NUdf::TInterval>::Id:
- case NUdf::TDataType<NUdf::TDate32>::Id:
- case NUdf::TDataType<NUdf::TDatetime64>::Id:
- case NUdf::TDataType<NUdf::TTimestamp64>::Id:
- case NUdf::TDataType<NUdf::TInterval64>::Id:
- buf.SkipMany(sizeof(i64));
- break;
-
- case NUdf::TDataType<float>::Id:
- case NUdf::TDataType<double>::Id:
- buf.SkipMany(sizeof(double));
- break;
-
- case NUdf::TDataType<NUdf::TUtf8>::Id:
- case NUdf::TDataType<char*>::Id:
- case NUdf::TDataType<NUdf::TJson>::Id:
- case NUdf::TDataType<NUdf::TYson>::Id:
- case NUdf::TDataType<NUdf::TUuid>::Id:
- case NUdf::TDataType<NUdf::TDyNumber>::Id:
- case NUdf::TDataType<NUdf::TTzDate>::Id:
- case NUdf::TDataType<NUdf::TTzDatetime>::Id:
- case NUdf::TDataType<NUdf::TTzTimestamp>::Id:
- case NUdf::TDataType<NUdf::TJsonDocument>::Id: {
- ui32 size;
- buf.ReadMany((char*)&size, sizeof(size));
- CHECK_STRING_LENGTH_UNSIGNED(size);
- buf.SkipMany(size);
- break;
- }
- case NUdf::TDataType<NUdf::TDecimal>::Id: {
- if (nativeYtTypeFlags & NTCF_DECIMAL) {
- auto const params = static_cast<TDataDecimalType*>(type)->GetParams();
- if (params.first < 10) {
- buf.SkipMany(sizeof(i32));
- } else if (params.first < 19) {
- buf.SkipMany(sizeof(i64));
- } else {
- buf.SkipMany(sizeof(NDecimal::TInt128));
- }
- } else {
- ui32 size;
- buf.ReadMany((char*)&size, sizeof(size));
- CHECK_STRING_LENGTH_UNSIGNED(size);
- buf.SkipMany(size);
- }
- break;
- }
- default:
- YQL_ENSURE(false, "Unsupported data type: " << schemeType);
- }
- return;
- }
-
- if (type->IsPg()) {
- SkipSkiffPg(static_cast<TPgType*>(type), buf);
- return;
- }
-
- if (type->IsStruct()) {
- auto structType = static_cast<TStructType*>(type);
- const std::vector<size_t>* reorder = nullptr;
- if (auto cookie = structType->GetCookie()) {
- reorder = ((const std::vector<size_t>*)cookie);
- }
- for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
- SkipSkiffField(structType->GetMemberType(reorder ? reorder->at(i) : i), nativeYtTypeFlags, buf);
- }
- return;
- }
-
- if (type->IsList()) {
- auto itemType = static_cast<TListType*>(type)->GetItemType();
- while (buf.Read() == '\0') {
- SkipSkiffField(itemType, nativeYtTypeFlags, buf);
- }
- return;
- }
-
- if (type->IsTuple()) {
- auto tupleType = static_cast<TTupleType*>(type);
-
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- SkipSkiffField(tupleType->GetElementType(i), nativeYtTypeFlags, buf);
- }
- return;
- }
-
- if (type->IsVariant()) {
- auto varType = AS_TYPE(TVariantType, type);
- ui16 data = 0;
- if (varType->GetAlternativesCount() < 256) {
- buf.ReadMany((char*)&data, 1);
- } else {
- buf.ReadMany((char*)&data, sizeof(data));
- }
-
- if (varType->GetUnderlyingType()->IsTuple()) {
- auto tupleType = AS_TYPE(TTupleType, varType->GetUnderlyingType());
- YQL_ENSURE(data < tupleType->GetElementsCount());
- SkipSkiffField(tupleType->GetElementType(data), nativeYtTypeFlags, buf);
- } else {
- auto structType = AS_TYPE(TStructType, varType->GetUnderlyingType());
- if (auto cookie = structType->GetCookie()) {
- const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie);
- data = reorder[data];
- }
- YQL_ENSURE(data < structType->GetMembersCount());
-
- SkipSkiffField(structType->GetMemberType(data), nativeYtTypeFlags, buf);
- }
- return;
- }
-
- if (type->IsVoid()) {
- return;
- }
-
- if (type->IsNull()) {
- return;
- }
-
- if (type->IsEmptyList() || type->IsEmptyDict()) {
- return;
- }
-
- if (type->IsDict()) {
- auto dictType = AS_TYPE(TDictType, type);
- auto keyType = dictType->GetKeyType();
- auto payloadType = dictType->GetPayloadType();
- while (buf.Read() == '\0') {
- SkipSkiffField(keyType, nativeYtTypeFlags, buf);
- SkipSkiffField(payloadType, nativeYtTypeFlags, buf);
- }
- return;
- }
-
- YQL_ENSURE(false, "Unsupported type for skip: " << type->GetKindAsStr());
-}
-
-NKikimr::NUdf::TUnboxedValue ReadSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags,
- const NKikimr::NMiniKQL::THolderFactory& holderFactory, TInputBuf& buf)
-{
- if (type->IsData()) {
- return ReadSkiffData(type, nativeYtTypeFlags, buf);
- }
-
- if (type->IsPg()) {
- return ReadSkiffPg(static_cast<TPgType*>(type), buf);
- }
-
- if (type->IsOptional()) {
- auto marker = buf.Read();
- if (!marker) {
- return NUdf::TUnboxedValue();
- }
-
- auto value = ReadSkiffNativeYtValue(AS_TYPE(TOptionalType, type)->GetItemType(), nativeYtTypeFlags, holderFactory, buf);
- return value.Release().MakeOptional();
- }
-
- if (type->IsTuple()) {
- auto tupleType = AS_TYPE(TTupleType, type);
- NUdf::TUnboxedValue* items;
- auto value = holderFactory.CreateDirectArrayHolder(tupleType->GetElementsCount(), items);
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- items[i] = ReadSkiffNativeYtValue(tupleType->GetElementType(i), nativeYtTypeFlags, holderFactory, buf);
- }
-
- return value;
- }
-
- if (type->IsStruct()) {
- auto structType = AS_TYPE(TStructType, type);
- NUdf::TUnboxedValue* items;
- auto value = holderFactory.CreateDirectArrayHolder(structType->GetMembersCount(), items);
-
- if (auto cookie = type->GetCookie()) {
- const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie);
- for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
- const auto ndx = reorder[i];
- items[ndx] = ReadSkiffNativeYtValue(structType->GetMemberType(ndx), nativeYtTypeFlags, holderFactory, buf);
- }
- } else {
- for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
- items[i] = ReadSkiffNativeYtValue(structType->GetMemberType(i), nativeYtTypeFlags, holderFactory, buf);
- }
- }
-
- return value;
- }
-
- if (type->IsList()) {
- auto itemType = AS_TYPE(TListType, type)->GetItemType();
- TDefaultListRepresentation items;
- while (buf.Read() == '\0') {
- items = items.Append(ReadSkiffNativeYtValue(itemType, nativeYtTypeFlags, holderFactory, buf));
- }
-
- return holderFactory.CreateDirectListHolder(std::move(items));
- }
-
- if (type->IsVariant()) {
- auto varType = AS_TYPE(TVariantType, type);
- ui16 data = 0;
- if (varType->GetAlternativesCount() < 256) {
- buf.ReadMany((char*)&data, 1);
- } else {
- buf.ReadMany((char*)&data, sizeof(data));
- }
- if (varType->GetUnderlyingType()->IsTuple()) {
- auto tupleType = AS_TYPE(TTupleType, varType->GetUnderlyingType());
- YQL_ENSURE(data < tupleType->GetElementsCount());
- auto item = ReadSkiffNativeYtValue(tupleType->GetElementType(data), nativeYtTypeFlags, holderFactory, buf);
- return holderFactory.CreateVariantHolder(item.Release(), data);
- }
- else {
- auto structType = AS_TYPE(TStructType, varType->GetUnderlyingType());
- if (auto cookie = structType->GetCookie()) {
- const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie);
- data = reorder[data];
- }
- YQL_ENSURE(data < structType->GetMembersCount());
-
- auto item = ReadSkiffNativeYtValue(structType->GetMemberType(data), nativeYtTypeFlags, holderFactory, buf);
- return holderFactory.CreateVariantHolder(item.Release(), data);
- }
- }
-
- if (type->IsVoid()) {
- return NUdf::TUnboxedValue::Zero();
- }
-
- if (type->IsNull()) {
- return NUdf::TUnboxedValue();
- }
-
- if (type->IsEmptyList() || type->IsEmptyDict()) {
- return holderFactory.GetEmptyContainerLazy();
- }
-
- if (type->IsDict()) {
- auto dictType = AS_TYPE(TDictType, type);
- auto keyType = dictType->GetKeyType();
- auto payloadType = dictType->GetPayloadType();
-
- auto builder = holderFactory.NewDict(dictType, NUdf::TDictFlags::EDictKind::Hashed);
- while (buf.Read() == '\0') {
- auto key = ReadSkiffNativeYtValue(keyType, nativeYtTypeFlags, holderFactory, buf);
- auto payload = ReadSkiffNativeYtValue(payloadType, nativeYtTypeFlags, holderFactory, buf);
- builder->Add(std::move(key), std::move(payload));
- }
-
- return builder->Build();
- }
-
- YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr());
-}
-
-extern "C" void ReadContainerNativeYtValue(TType* type, ui64 nativeYtTypeFlags, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
- NUdf::TUnboxedValue& value, TInputBuf& buf, bool wrapOptional) {
- auto tmp = ReadSkiffNativeYtValue(type, nativeYtTypeFlags, holderFactory, buf);
- if (!wrapOptional) {
- value = std::move(tmp);
- } else {
- value = tmp.Release().MakeOptional();
- }
-}
-
-///////////////////////////////////////////
-//
-// Initial state first = last = &dummy
-//
-// +1 block first = &dummy, last = newPage, first.next = newPage, newPage.next= &dummy
-// +1 block first = &dummy, last = newPage2, first.next = newPage, newPage.next = newPage2, newPage2.next = &dummy
-//
-///////////////////////////////////////////
-class TTempBlockWriter : public NCommon::IBlockWriter {
-public:
- TTempBlockWriter()
- : Pool_(*TlsAllocState)
- , Last_(&Dummy_)
- {
- Dummy_.Avail_ = 0;
- Dummy_.Next_ = &Dummy_;
- }
-
- ~TTempBlockWriter() {
- auto current = Dummy_.Next_; // skip dummy node
- while (current != &Dummy_) {
- auto next = current->Next_;
- Pool_.ReturnPage(current);
- current = next;
- }
- }
-
- void SetRecordBoundaryCallback(std::function<void()> callback) override {
- Y_UNUSED(callback);
- }
-
- void WriteBlocks(TOutputBuf& buf) const {
- auto current = Dummy_.Next_; // skip dummy node
- while (current != &Dummy_) {
- auto next = current->Next_;
- buf.WriteMany((const char*)(current + 1), current->Avail_);
- current = next;
- }
- }
-
- TTempBlockWriter(const TTempBlockWriter&) = delete;
- void operator=(const TTempBlockWriter&) = delete;
-
- std::pair<char*, char*> NextEmptyBlock() override {
- auto newPage = Pool_.GetPage();
- auto header = (TPageHeader*)newPage;
- header->Avail_ = 0;
- header->Next_ = &Dummy_;
- Last_->Next_ = header;
- Last_ = header;
- return std::make_pair((char*)(header + 1), (char*)newPage + TAlignedPagePool::POOL_PAGE_SIZE);
- }
-
- void ReturnBlock(size_t avail, std::optional<size_t> lastRecordBoundary) override {
- Y_UNUSED(lastRecordBoundary);
- YQL_ENSURE(avail <= TAlignedPagePool::POOL_PAGE_SIZE - sizeof(TPageHeader));
- Last_->Avail_ = avail;
- }
-
- void Finish() override {
- }
-
-private:
- struct TPageHeader {
- TPageHeader* Next_ = nullptr;
- ui32 Avail_ = 0;
- };
-
- NKikimr::TAlignedPagePool& Pool_;
- TPageHeader* Last_;
- TPageHeader Dummy_;
-};
-
-void WriteYsonValueInTableFormat(TOutputBuf& buf, TType* type, ui64 nativeYtTypeFlags, const NUdf::TUnboxedValuePod& value, bool topLevel) {
- // Table format, very compact
- switch (type->GetKind()) {
- case TType::EKind::Variant: {
- buf.Write(BeginListSymbol);
- auto varType = static_cast<TVariantType*>(type);
- auto underlyingType = varType->GetUnderlyingType();
- auto index = value.GetVariantIndex();
- YQL_ENSURE(index < varType->GetAlternativesCount(), "Bad variant alternative: " << index << ", only " << varType->GetAlternativesCount() << " are available");
- YQL_ENSURE(underlyingType->IsTuple() || underlyingType->IsStruct(), "Wrong underlying type");
- TType* itemType;
- if (underlyingType->IsTuple()) {
- itemType = static_cast<TTupleType*>(underlyingType)->GetElementType(index);
- }
- else {
- itemType = static_cast<TStructType*>(underlyingType)->GetMemberType(index);
- }
- if (!(nativeYtTypeFlags & NTCF_COMPLEX) || underlyingType->IsTuple()) {
- buf.Write(Uint64Marker);
- buf.WriteVarUI64(index);
- } else {
- auto structType = static_cast<TStructType*>(underlyingType);
- auto varName = structType->GetMemberName(index);
- buf.Write(StringMarker);
- buf.WriteVarI32(varName.size());
- buf.WriteMany(varName);
- }
- buf.Write(ListItemSeparatorSymbol);
- WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, value.GetVariantItem(), false);
- buf.Write(ListItemSeparatorSymbol);
- buf.Write(EndListSymbol);
- break;
- }
-
- case TType::EKind::Data: {
- auto schemeType = static_cast<TDataType*>(type)->GetSchemeType();
- switch (schemeType) {
- case NUdf::TDataType<bool>::Id: {
- buf.Write(value.Get<bool>() ? TrueMarker : FalseMarker);
- break;
- }
-
- case NUdf::TDataType<ui8>::Id:
- buf.Write(Uint64Marker);
- buf.WriteVarUI64(value.Get<ui8>());
- break;
-
- case NUdf::TDataType<i8>::Id:
- buf.Write(Int64Marker);
- buf.WriteVarI64(value.Get<i8>());
- break;
-
- case NUdf::TDataType<ui16>::Id:
- buf.Write(Uint64Marker);
- buf.WriteVarUI64(value.Get<ui16>());
- break;
-
- case NUdf::TDataType<i16>::Id:
- buf.Write(Int64Marker);
- buf.WriteVarI64(value.Get<i16>());
- break;
-
- case NUdf::TDataType<i32>::Id:
- buf.Write(Int64Marker);
- buf.WriteVarI64(value.Get<i32>());
- break;
-
- case NUdf::TDataType<ui32>::Id:
- buf.Write(Uint64Marker);
- buf.WriteVarUI64(value.Get<ui32>());
- break;
-
- case NUdf::TDataType<i64>::Id:
- buf.Write(Int64Marker);
- buf.WriteVarI64(value.Get<i64>());
- break;
-
- case NUdf::TDataType<ui64>::Id:
- buf.Write(Uint64Marker);
- buf.WriteVarUI64(value.Get<ui64>());
- break;
-
- case NUdf::TDataType<float>::Id: {
- buf.Write(DoubleMarker);
- double val = value.Get<float>();
- buf.WriteMany((const char*)&val, sizeof(val));
- break;
- }
-
- case NUdf::TDataType<double>::Id: {
- buf.Write(DoubleMarker);
- double val = value.Get<double>();
- buf.WriteMany((const char*)&val, sizeof(val));
- break;
- }
-
- case NUdf::TDataType<NUdf::TUtf8>::Id:
- case NUdf::TDataType<char*>::Id:
- case NUdf::TDataType<NUdf::TJson>::Id:
- case NUdf::TDataType<NUdf::TDyNumber>::Id:
- case NUdf::TDataType<NUdf::TUuid>::Id: {
- buf.Write(StringMarker);
- auto str = value.AsStringRef();
- buf.WriteVarI32(str.Size());
- buf.WriteMany(str);
- break;
- }
-
- case NUdf::TDataType<NUdf::TDecimal>::Id: {
- buf.Write(StringMarker);
- if (nativeYtTypeFlags & NTCF_DECIMAL){
- auto const params = static_cast<TDataDecimalType*>(type)->GetParams();
- const NDecimal::TInt128 data128 = value.GetInt128();
- char tmpBuf[NYT::NDecimal::TDecimal::MaxBinarySize];
- if (params.first < 10) {
- // The YQL format differs from the YT format in the inf/nan values. NDecimal::FromYtDecimal converts nan/inf
- TStringBuf resBuf = NYT::NDecimal::TDecimal::WriteBinary32(params.first, NDecimal::ToYtDecimal<i32>(data128), tmpBuf, NYT::NDecimal::TDecimal::MaxBinarySize);
- buf.WriteVarI32(resBuf.size());
- buf.WriteMany(resBuf.data(), resBuf.size());
- } else if (params.first < 19) {
- TStringBuf resBuf = NYT::NDecimal::TDecimal::WriteBinary64(params.first, NDecimal::ToYtDecimal<i64>(data128), tmpBuf, NYT::NDecimal::TDecimal::MaxBinarySize);
- buf.WriteVarI32(resBuf.size());
- buf.WriteMany(resBuf.data(), resBuf.size());
- } else {
- YQL_ENSURE(params.first < 36);
- NYT::NDecimal::TDecimal::TValue128 val;
- auto data128Converted = NDecimal::ToYtDecimal<NDecimal::TInt128>(data128);
- memcpy(&val, &data128Converted, sizeof(val));
- auto resBuf = NYT::NDecimal::TDecimal::WriteBinary128(params.first, val, tmpBuf, NYT::NDecimal::TDecimal::MaxBinarySize);
- buf.WriteVarI32(resBuf.size());
- buf.WriteMany(resBuf.data(), resBuf.size());
- }
- } else {
- char data[sizeof(NDecimal::TInt128)];
- const ui32 size = NDecimal::Serialize(value.GetInt128(), data);
- buf.WriteVarI32(size);
- buf.WriteMany(data, size);
- }
- break;
- }
-
- case NUdf::TDataType<NUdf::TYson>::Id: {
- // embed content
- buf.WriteMany(value.AsStringRef());
- break;
- }
-
- case NUdf::TDataType<NUdf::TDate>::Id:
- buf.Write(Uint64Marker);
- buf.WriteVarUI64(value.Get<ui16>());
- break;
-
- case NUdf::TDataType<NUdf::TDatetime>::Id:
- buf.Write(Uint64Marker);
- buf.WriteVarUI64(value.Get<ui32>());
- break;
-
- case NUdf::TDataType<NUdf::TTimestamp>::Id:
- buf.Write(Uint64Marker);
- buf.WriteVarUI64(value.Get<ui64>());
- break;
-
- case NUdf::TDataType<NUdf::TInterval>::Id:
- case NUdf::TDataType<NUdf::TInterval64>::Id:
- case NUdf::TDataType<NUdf::TDatetime64>::Id:
- case NUdf::TDataType<NUdf::TTimestamp64>::Id:
- buf.Write(Int64Marker);
- buf.WriteVarI64(value.Get<i64>());
- break;
-
- case NUdf::TDataType<NUdf::TDate32>::Id:
- buf.Write(Int64Marker);
- buf.WriteVarI64(value.Get<i32>());
- break;
-
- case NUdf::TDataType<NUdf::TTzDate>::Id: {
- ui16 tzId = SwapBytes(value.GetTimezoneId());
- ui16 data = SwapBytes(value.Get<ui16>());
- ui32 size = sizeof(data) + sizeof(tzId);
- buf.Write(StringMarker);
- buf.WriteVarI32(size);
- buf.WriteMany((const char*)&data, sizeof(data));
- buf.WriteMany((const char*)&tzId, sizeof(tzId));
- break;
- }
-
- case NUdf::TDataType<NUdf::TTzDatetime>::Id: {
- ui16 tzId = SwapBytes(value.GetTimezoneId());
- ui32 data = SwapBytes(value.Get<ui32>());
- ui32 size = sizeof(data) + sizeof(tzId);
- buf.Write(StringMarker);
- buf.WriteVarI32(size);
- buf.WriteMany((const char*)&data, sizeof(data));
- buf.WriteMany((const char*)&tzId, sizeof(tzId));
- break;
- }
-
- case NUdf::TDataType<NUdf::TTzTimestamp>::Id: {
- ui16 tzId = SwapBytes(value.GetTimezoneId());
- ui64 data = SwapBytes(value.Get<ui64>());
- ui32 size = sizeof(data) + sizeof(tzId);
- buf.Write(StringMarker);
- buf.WriteVarI32(size);
- buf.WriteMany((const char*)&data, sizeof(data));
- buf.WriteMany((const char*)&tzId, sizeof(tzId));
- break;
- }
-
- case NUdf::TDataType<NUdf::TTzDate32>::Id: {
- ui16 tzId = SwapBytes(value.GetTimezoneId());
- ui32 data = 0x80 ^ SwapBytes((ui32)value.Get<i32>());
- ui32 size = sizeof(data) + sizeof(tzId);
- buf.Write(StringMarker);
- buf.WriteVarI32(size);
- buf.WriteMany((const char*)&data, sizeof(data));
- buf.WriteMany((const char*)&tzId, sizeof(tzId));
- break;
- }
-
- case NUdf::TDataType<NUdf::TTzDatetime64>::Id: {
- ui16 tzId = SwapBytes(value.GetTimezoneId());
- ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>());
- ui32 size = sizeof(data) + sizeof(tzId);
- buf.Write(StringMarker);
- buf.WriteVarI32(size);
- buf.WriteMany((const char*)&data, sizeof(data));
- buf.WriteMany((const char*)&tzId, sizeof(tzId));
- break;
- }
-
- case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: {
- ui16 tzId = SwapBytes(value.GetTimezoneId());
- ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>());
- ui32 size = sizeof(data) + sizeof(tzId);
- buf.Write(StringMarker);
- buf.WriteVarI32(size);
- buf.WriteMany((const char*)&data, sizeof(data));
- buf.WriteMany((const char*)&tzId, sizeof(tzId));
- break;
- }
-
- case NUdf::TDataType<NUdf::TJsonDocument>::Id: {
- buf.Write(StringMarker);
- NUdf::TUnboxedValue json = ValueToString(EDataSlot::JsonDocument, value);
- auto str = json.AsStringRef();
- buf.WriteVarI32(str.Size());
- buf.WriteMany(str);
- break;
- }
-
- default:
- YQL_ENSURE(false, "Unsupported data type: " << schemeType);
- }
-
- break;
- }
-
- case TType::EKind::Struct: {
- auto structType = static_cast<TStructType*>(type);
- if (nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) {
- buf.Write(BeginMapSymbol);
- for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
- buf.Write(StringMarker);
- auto key = structType->GetMemberName(i);
- buf.WriteVarI32(key.size());
- buf.WriteMany(key);
- buf.Write(KeyValueSeparatorSymbol);
- WriteYsonValueInTableFormat(buf, structType->GetMemberType(i), nativeYtTypeFlags, value.GetElement(i), false);
- buf.Write(KeyedItemSeparatorSymbol);
- }
- buf.Write(EndMapSymbol);
- } else {
- buf.Write(BeginListSymbol);
- for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
- WriteYsonValueInTableFormat(buf, structType->GetMemberType(i), nativeYtTypeFlags, value.GetElement(i), false);
- buf.Write(ListItemSeparatorSymbol);
- }
- buf.Write(EndListSymbol);
- }
- break;
- }
-
- case TType::EKind::List: {
- auto itemType = static_cast<TListType*>(type)->GetItemType();
- const auto iter = value.GetListIterator();
- buf.Write(BeginListSymbol);
- for (NUdf::TUnboxedValue item; iter.Next(item); buf.Write(ListItemSeparatorSymbol)) {
- WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, item, false);
- }
-
- buf.Write(EndListSymbol);
- break;
- }
-
- case TType::EKind::Optional: {
- auto itemType = static_cast<TOptionalType*>(type)->GetItemType();
- if (nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) {
- if (value) {
- if (itemType->GetKind() == TType::EKind::Optional || itemType->GetKind() == TType::EKind::Pg) {
- buf.Write(BeginListSymbol);
- }
- WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, value.GetOptionalValue(), false);
- if (itemType->GetKind() == TType::EKind::Optional || itemType->GetKind() == TType::EKind::Pg) {
- buf.Write(ListItemSeparatorSymbol);
- buf.Write(EndListSymbol);
- }
- } else {
- buf.Write(EntitySymbol);
- }
- } else {
- if (!value) {
- if (topLevel) {
- buf.Write(BeginListSymbol);
- buf.Write(EndListSymbol);
- }
- else {
- buf.Write(EntitySymbol);
- }
- }
- else {
- buf.Write(BeginListSymbol);
- WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, value.GetOptionalValue(), false);
- buf.Write(ListItemSeparatorSymbol);
- buf.Write(EndListSymbol);
- }
- }
- break;
- }
-
- case TType::EKind::Dict: {
- auto dictType = static_cast<TDictType*>(type);
- const auto iter = value.GetDictIterator();
- buf.Write(BeginListSymbol);
- for (NUdf::TUnboxedValue key, payload; iter.NextPair(key, payload);) {
- buf.Write(BeginListSymbol);
- WriteYsonValueInTableFormat(buf, dictType->GetKeyType(), nativeYtTypeFlags, key, false);
- buf.Write(ListItemSeparatorSymbol);
- WriteYsonValueInTableFormat(buf, dictType->GetPayloadType(), nativeYtTypeFlags, payload, false);
- buf.Write(ListItemSeparatorSymbol);
- buf.Write(EndListSymbol);
- buf.Write(ListItemSeparatorSymbol);
- }
-
- buf.Write(EndListSymbol);
- break;
- }
-
- case TType::EKind::Tuple: {
- auto tupleType = static_cast<TTupleType*>(type);
- buf.Write(BeginListSymbol);
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- WriteYsonValueInTableFormat(buf, tupleType->GetElementType(i), nativeYtTypeFlags, value.GetElement(i), false);
- buf.Write(ListItemSeparatorSymbol);
- }
-
- buf.Write(EndListSymbol);
- break;
- }
-
- case TType::EKind::Void: {
- buf.Write(EntitySymbol);
- break;
- }
-
- case TType::EKind::Null: {
- buf.Write(EntitySymbol);
- break;
- }
-
- case TType::EKind::EmptyList: {
- buf.Write(BeginListSymbol);
- buf.Write(EndListSymbol);
- break;
- }
-
- case TType::EKind::EmptyDict: {
- buf.Write(BeginListSymbol);
- buf.Write(EndListSymbol);
- break;
- }
-
- case TType::EKind::Pg: {
- auto pgType = static_cast<TPgType*>(type);
- WriteYsonValueInTableFormatPg(buf, pgType, value, topLevel);
- break;
- }
-
- default:
- YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr());
- }
-}
-
-extern "C" void WriteYsonContainerValue(TType* type, ui64 nativeYtTypeFlags, const NUdf::TUnboxedValuePod& value, TOutputBuf& buf) {
- TTempBlockWriter blockWriter;
- TOutputBuf ysonBuf(blockWriter, nullptr);
- WriteYsonValueInTableFormat(ysonBuf, type, nativeYtTypeFlags, value, true);
- ysonBuf.Flush();
- ui32 size = ysonBuf.GetWrittenBytes();
- buf.WriteMany((const char*)&size, sizeof(size));
- blockWriter.WriteBlocks(buf);
-}
-
-extern "C" void WriteContainerNativeYtValue(TType* type, ui64 nativeYtTypeFlags, const NUdf::TUnboxedValuePod& value, TOutputBuf& buf) {
- WriteSkiffNativeYtValue(type, nativeYtTypeFlags, value, buf);
-}
-
-void WriteSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) {
- auto schemeType = static_cast<TDataType*>(type)->GetSchemeType();
- switch (schemeType) {
- case NUdf::TDataType<bool>::Id: {
- ui8 data = value.Get<ui8>();
- buf.Write(data);
- break;
- }
-
- case NUdf::TDataType<ui8>::Id: {
- ui64 data = value.Get<ui8>();
- buf.WriteMany((const char*)&data, sizeof(data));
- break;
- }
-
- case NUdf::TDataType<i8>::Id: {
- i64 data = value.Get<i8>();
- buf.WriteMany((const char*)&data, sizeof(data));
- break;
- }
-
- case NUdf::TDataType<NUdf::TDate>::Id:
- case NUdf::TDataType<ui16>::Id: {
- ui64 data = value.Get<ui16>();
- buf.WriteMany((const char*)&data, sizeof(data));
- break;
- }
-
- case NUdf::TDataType<i16>::Id: {
- i64 data = value.Get<i16>();
- buf.WriteMany((const char*)&data, sizeof(data));
- break;
- }
-
- case NUdf::TDataType<NUdf::TDate32>::Id:
- case NUdf::TDataType<i32>::Id: {
- i64 data = value.Get<i32>();
- buf.WriteMany((const char*)&data, sizeof(data));
- break;
- }
-
- case NUdf::TDataType<NUdf::TDatetime>::Id:
- case NUdf::TDataType<ui32>::Id: {
- ui64 data = value.Get<ui32>();
- buf.WriteMany((const char*)&data, sizeof(data));
- break;
- }
-
- case NUdf::TDataType<NUdf::TInterval>::Id:
- case NUdf::TDataType<NUdf::TInterval64>::Id:
- case NUdf::TDataType<NUdf::TDatetime64>::Id:
- case NUdf::TDataType<NUdf::TTimestamp64>::Id:
- case NUdf::TDataType<i64>::Id: {
- i64 data = value.Get<i64>();
- buf.WriteMany((const char*)&data, sizeof(data));
- break;
- }
-
- case NUdf::TDataType<NUdf::TTimestamp>::Id:
- case NUdf::TDataType<ui64>::Id: {
- ui64 data = value.Get<ui64>();
- buf.WriteMany((const char*)&data, sizeof(data));
- break;
- }
-
- case NUdf::TDataType<float>::Id: {
- double data = value.Get<float>();
- buf.WriteMany((const char*)&data, sizeof(data));
- break;
- }
-
- case NUdf::TDataType<double>::Id: {
- double data = value.Get<double>();
- buf.WriteMany((const char*)&data, sizeof(data));
- break;
- }
-
- case NUdf::TDataType<NUdf::TUtf8>::Id:
- case NUdf::TDataType<char*>::Id:
- case NUdf::TDataType<NUdf::TJson>::Id:
- case NUdf::TDataType<NUdf::TYson>::Id:
- case NUdf::TDataType<NUdf::TDyNumber>::Id:
- case NUdf::TDataType<NUdf::TUuid>::Id: {
- auto str = value.AsStringRef();
- ui32 size = str.Size();
- buf.WriteMany((const char*)&size, sizeof(size));
- buf.WriteMany(str);
- break;
- }
-
- case NUdf::TDataType<NUdf::TDecimal>::Id: {
- if (nativeYtTypeFlags & NTCF_DECIMAL) {
- auto const params = static_cast<TDataDecimalType*>(type)->GetParams();
- const NDecimal::TInt128 data128 = value.GetInt128();
- if (params.first < 10) {
- auto data = NDecimal::ToYtDecimal<i32>(data128);
- buf.WriteMany((const char*)&data, sizeof(data));
- } else if (params.first < 19) {
- auto data = NDecimal::ToYtDecimal<i64>(data128);
- buf.WriteMany((const char*)&data, sizeof(data));
- } else {
- YQL_ENSURE(params.first < 36);
- auto data = NDecimal::ToYtDecimal<NDecimal::TInt128>(data128);
- buf.WriteMany((const char*)&data, sizeof(data));
- }
- } else {
- char data[sizeof(NDecimal::TInt128)];
- const ui32 size = NDecimal::Serialize(value.GetInt128(), data);
- buf.WriteMany(reinterpret_cast<const char*>(&size), sizeof(size));
- buf.WriteMany(data, size);
- }
- break;
- }
-
- case NUdf::TDataType<NUdf::TTzDate>::Id: {
- ui16 tzId = SwapBytes(value.GetTimezoneId());
- ui16 data = SwapBytes(value.Get<ui16>());
- ui32 size = sizeof(data) + sizeof(tzId);
- buf.WriteMany((const char*)&size, sizeof(size));
- buf.WriteMany((const char*)&data, sizeof(data));
- buf.WriteMany((const char*)&tzId, sizeof(tzId));
- break;
- }
-
- case NUdf::TDataType<NUdf::TTzDatetime>::Id: {
- ui16 tzId = SwapBytes(value.GetTimezoneId());
- ui32 data = SwapBytes(value.Get<ui32>());
- ui32 size = sizeof(data) + sizeof(tzId);
- buf.WriteMany((const char*)&size, sizeof(size));
- buf.WriteMany((const char*)&data, sizeof(data));
- buf.WriteMany((const char*)&tzId, sizeof(tzId));
- break;
- }
-
- case NUdf::TDataType<NUdf::TTzTimestamp>::Id: {
- ui16 tzId = SwapBytes(value.GetTimezoneId());
- ui64 data = SwapBytes(value.Get<ui64>());
- ui32 size = sizeof(data) + sizeof(tzId);
- buf.WriteMany((const char*)&size, sizeof(size));
- buf.WriteMany((const char*)&data, sizeof(data));
- buf.WriteMany((const char*)&tzId, sizeof(tzId));
- break;
- }
-
- case NUdf::TDataType<NUdf::TTzDate32>::Id: {
- ui16 tzId = SwapBytes(value.GetTimezoneId());
- ui32 data = 0x80 ^ SwapBytes((ui32)value.Get<i32>());
- ui32 size = sizeof(data) + sizeof(tzId);
- buf.WriteMany((const char*)&size, sizeof(size));
- buf.WriteMany((const char*)&data, sizeof(data));
- buf.WriteMany((const char*)&tzId, sizeof(tzId));
- break;
- }
-
- case NUdf::TDataType<NUdf::TTzDatetime64>::Id: {
- ui16 tzId = SwapBytes(value.GetTimezoneId());
- ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>());
- ui32 size = sizeof(data) + sizeof(tzId);
- buf.WriteMany((const char*)&size, sizeof(size));
- buf.WriteMany((const char*)&data, sizeof(data));
- buf.WriteMany((const char*)&tzId, sizeof(tzId));
- break;
- }
-
- case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: {
- ui16 tzId = SwapBytes(value.GetTimezoneId());
- ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>());
- ui32 size = sizeof(data) + sizeof(tzId);
- buf.WriteMany((const char*)&size, sizeof(size));
- buf.WriteMany((const char*)&data, sizeof(data));
- buf.WriteMany((const char*)&tzId, sizeof(tzId));
- break;
- }
-
- case NUdf::TDataType<NUdf::TJsonDocument>::Id: {
- NUdf::TUnboxedValue json = ValueToString(EDataSlot::JsonDocument, value);
- auto str = json.AsStringRef();
- ui32 size = str.Size();
- buf.WriteMany((const char*)&size, sizeof(size));
- buf.WriteMany(str);
- break;
- }
-
- default:
- YQL_ENSURE(false, "Unsupported data type: " << schemeType);
- }
-}
-
-void WriteSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) {
- if (type->IsData()) {
- WriteSkiffData(type, nativeYtTypeFlags, value, buf);
- } else if (type->IsPg()) {
- WriteSkiffPgValue(static_cast<TPgType*>(type), value, buf);
- } else if (type->IsOptional()) {
- if (!value) {
- buf.Write('\0');
- return;
- }
-
- buf.Write('\1');
- WriteSkiffNativeYtValue(AS_TYPE(TOptionalType, type)->GetItemType(), nativeYtTypeFlags, value.GetOptionalValue(), buf);
- } else if (type->IsList()) {
- auto itemType = AS_TYPE(TListType, type)->GetItemType();
- auto elements = value.GetElements();
- if (elements) {
- ui32 size = value.GetListLength();
- for (ui32 i = 0; i < size; ++i) {
- buf.Write('\0');
- WriteSkiffNativeYtValue(itemType, nativeYtTypeFlags, elements[i], buf);
- }
- } else {
- NUdf::TUnboxedValue item;
- for (auto iter = value.GetListIterator(); iter.Next(item); ) {
- buf.Write('\0');
- WriteSkiffNativeYtValue(itemType, nativeYtTypeFlags, item, buf);
- }
- }
-
- buf.Write('\xff');
- } else if (type->IsTuple()) {
- auto tupleType = AS_TYPE(TTupleType, type);
- auto elements = value.GetElements();
- if (elements) {
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- WriteSkiffNativeYtValue(tupleType->GetElementType(i), nativeYtTypeFlags, elements[i], buf);
- }
- } else {
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- WriteSkiffNativeYtValue(tupleType->GetElementType(i), nativeYtTypeFlags, value.GetElement(i), buf);
- }
- }
- } else if (type->IsStruct()) {
- auto structType = AS_TYPE(TStructType, type);
- auto elements = value.GetElements();
- if (auto cookie = type->GetCookie()) {
- const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie);
- if (elements) {
- for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
- const auto ndx = reorder[i];
- WriteSkiffNativeYtValue(structType->GetMemberType(ndx), nativeYtTypeFlags, elements[ndx], buf);
- }
- } else {
- for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
- const auto ndx = reorder[i];
- WriteSkiffNativeYtValue(structType->GetMemberType(ndx), nativeYtTypeFlags, value.GetElement(ndx), buf);
- }
- }
- } else {
- if (elements) {
- for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
- WriteSkiffNativeYtValue(structType->GetMemberType(i), nativeYtTypeFlags, elements[i], buf);
- }
- } else {
- for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
- WriteSkiffNativeYtValue(structType->GetMemberType(i), nativeYtTypeFlags, value.GetElement(i), buf);
- }
- }
- }
- } else if (type->IsVariant()) {
- auto varType = AS_TYPE(TVariantType, type);
- ui16 index = (ui16)value.GetVariantIndex();
- if (varType->GetAlternativesCount() < 256) {
- buf.WriteMany((const char*)&index, 1);
- } else {
- buf.WriteMany((const char*)&index, sizeof(index));
- }
-
- if (varType->GetUnderlyingType()->IsTuple()) {
- auto tupleType = AS_TYPE(TTupleType, varType->GetUnderlyingType());
- WriteSkiffNativeYtValue(tupleType->GetElementType(index), nativeYtTypeFlags, value.GetVariantItem(), buf);
- } else {
- auto structType = AS_TYPE(TStructType, varType->GetUnderlyingType());
- if (auto cookie = structType->GetCookie()) {
- const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie);
- index = reorder[index];
- }
- YQL_ENSURE(index < structType->GetMembersCount());
-
- WriteSkiffNativeYtValue(structType->GetMemberType(index), nativeYtTypeFlags, value.GetVariantItem(), buf);
- }
- } else if (type->IsVoid() || type->IsNull() || type->IsEmptyList() || type->IsEmptyDict()) {
- } else if (type->IsDict()) {
- auto dictType = AS_TYPE(TDictType, type);
- auto keyType = dictType->GetKeyType();
- auto payloadType = dictType->GetPayloadType();
- NUdf::TUnboxedValue key, payload;
- for (auto iter = value.GetDictIterator(); iter.NextPair(key, payload); ) {
- buf.Write('\0');
- WriteSkiffNativeYtValue(keyType, nativeYtTypeFlags, key, buf);
- WriteSkiffNativeYtValue(payloadType, nativeYtTypeFlags, payload, buf);
- }
-
- buf.Write('\xff');
- } else {
- YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr());
- }
+ return ParseYsonValue(holderFactory, NYT::NodeToYsonString(node, NYson::EYsonFormat::Binary), type, err);
}
TExprNode::TPtr ValueToExprLiteral(const TTypeAnnotationNode* type, const NKikimr::NUdf::TUnboxedValuePod& value, TExprContext& ctx,
diff --git a/yql/essentials/providers/common/codec/yql_codec.h b/yql/essentials/providers/common/codec/yql_codec.h
index f3f50e5c58..741f0b1966 100644
--- a/yql/essentials/providers/common/codec/yql_codec.h
+++ b/yql/essentials/providers/common/codec/yql_codec.h
@@ -63,43 +63,15 @@ struct TCodecContext {
void SkipYson(char cmd, TInputBuf& buf);
void CopyYson(char cmd, TInputBuf& buf, TVector<char>& yson);
void CopyYsonWithAttrs(char cmd, TInputBuf& buf, TVector<char>& yson);
-NKikimr::NUdf::TUnboxedValue ReadYsonValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NMiniKQL::THolderFactory& holderFactory, char cmd, TInputBuf& buf, bool isTableFormat);
+TStringBuf ReadNextString(char cmd, TInputBuf& buf);
+NKikimr::NUdf::TUnboxedValue ReadYsonValue(NKikimr::NMiniKQL::TType* type, const NKikimr::NMiniKQL::THolderFactory& holderFactory, char cmd, TInputBuf& buf);
TMaybe<NKikimr::NUdf::TUnboxedValue> ParseYsonValue(const NKikimr::NMiniKQL::THolderFactory& holderFactory,
- const TStringBuf& yson, NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, IOutputStream* err, bool isTableFormat);
-TMaybe<NKikimr::NUdf::TUnboxedValue> ParseYsonNode(const NKikimr::NMiniKQL::THolderFactory& holderFactory,
- const NYT::TNode& node, NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, IOutputStream* err);
+ const TStringBuf& yson, NKikimr::NMiniKQL::TType* type, IOutputStream* err);
TMaybe<NKikimr::NUdf::TUnboxedValue> ParseYsonNodeInResultFormat(const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const NYT::TNode& node, NKikimr::NMiniKQL::TType* type, IOutputStream* err);
-extern "C" void ReadYsonContainerValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags,
- const NKikimr::NMiniKQL::THolderFactory& holderFactory, NKikimr::NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf,
- bool wrapOptional);
-
-void SkipSkiffField(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, TInputBuf& buf);
-
-NKikimr::NUdf::TUnboxedValue ReadSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags,
- const NKikimr::NMiniKQL::THolderFactory& holderFactory, TInputBuf& buf);
-
-NKikimr::NUdf::TUnboxedValue ReadSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, NCommon::TInputBuf& buf);
-extern "C" void ReadContainerNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags,
- const NKikimr::NMiniKQL::THolderFactory& holderFactory, NKikimr::NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf,
- bool wrapOptional);
-
-extern "C" void WriteYsonContainerValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags,
- const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf);
-
-void WriteSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf);
-
-void WriteSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags,
- const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf);
-
-extern "C" void WriteContainerNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags,
- const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf);
-
-void WriteYsonValueInTableFormat(TOutputBuf& buf, NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, bool topLevel);
-
TExprNode::TPtr ValueToExprLiteral(const TTypeAnnotationNode* type, const NKikimr::NUdf::TUnboxedValuePod& value, TExprContext& ctx,
TPositionHandle pos = {});
diff --git a/yt/yql/providers/yt/codec/codegen/ya.make.inc b/yt/yql/providers/yt/codec/codegen/ya.make.inc
index 0548d7e545..e418e84d22 100644
--- a/yt/yql/providers/yt/codec/codegen/ya.make.inc
+++ b/yt/yql/providers/yt/codec/codegen/ya.make.inc
@@ -15,6 +15,7 @@ PEERDIR(
yql/essentials/parser/pg_wrapper/interface
yql/essentials/utils
yt/yql/providers/yt/codec/codegen
+ yt/yql/providers/yt/codec
)
IF (NOT MKQL_DISABLE_CODEGEN)
diff --git a/yt/yql/providers/yt/codec/codegen/yt_codec_cg.cpp b/yt/yql/providers/yt/codec/codegen/yt_codec_cg.cpp
index 61c56e5531..4c7745155f 100644
--- a/yt/yql/providers/yt/codec/codegen/yt_codec_cg.cpp
+++ b/yt/yql/providers/yt/codec/codegen/yt_codec_cg.cpp
@@ -4,6 +4,7 @@
#include <yql/essentials/parser/pg_wrapper/interface/codec.h>
#include <yql/essentials/providers/common/codec/yql_codec_buf.h>
#include <yql/essentials/providers/common/codec/yql_codec_type_flags.h>
+#include <yt/yql/providers/yt/codec/yt_codec.h>
#ifndef MKQL_DISABLE_CODEGEN
#include <yql/essentials/minikql/mkql_node.h>
@@ -171,7 +172,7 @@ public:
const auto valType = Type::getInt128Ty(context);
const auto flagsConst = ConstantInt::get(Type::getInt64Ty(context), nativeYtTypeFlags);
if (nativeYtTypeFlags) {
- const auto funcAddr = ConstantInt::get(Type::getInt64Ty(context), (ui64)&NYql::NCommon::WriteContainerNativeYtValue);
+ const auto funcAddr = ConstantInt::get(Type::getInt64Ty(context), (ui64)&NYql::WriteContainerNativeYtValue);
const auto funType = FunctionType::get(Type::getVoidTy(context), {
Type::getInt64Ty(context), Type::getInt64Ty(context), PointerType::getUnqual(valType),
PointerType::getUnqual(Type::getInt8Ty(context))
@@ -180,7 +181,7 @@ public:
const auto funcPtr = CastInst::Create(Instruction::IntToPtr, funcAddr, PointerType::getUnqual(funType), "ptr", Block_);
CallInst::Create(funType, funcPtr, { typeConst, flagsConst, elemPtr, buf }, "", Block_);
} else {
- const auto funcAddr = ConstantInt::get(Type::getInt64Ty(context), (ui64)&NYql::NCommon::WriteYsonContainerValue);
+ const auto funcAddr = ConstantInt::get(Type::getInt64Ty(context), (ui64)&NYql::WriteYsonContainerValue);
const auto funType = FunctionType::get(Type::getVoidTy(context), {
Type::getInt64Ty(context), Type::getInt64Ty(context), PointerType::getUnqual(valType),
PointerType::getUnqual(Type::getInt8Ty(context))
@@ -792,7 +793,7 @@ private:
void GenerateContainer(Value* velemPtr, Value* buf, TType* type, bool wrapOptional, ui64 nativeYtTypeFlags) {
auto& context = Codegen_->GetContext();
- const auto funcAddr = ConstantInt::get(Type::getInt64Ty(context), nativeYtTypeFlags ? (ui64)&NCommon::ReadContainerNativeYtValue : (ui64)&NCommon::ReadYsonContainerValue);
+ const auto funcAddr = ConstantInt::get(Type::getInt64Ty(context), nativeYtTypeFlags ? (ui64)&ReadContainerNativeYtValue : (ui64)&ReadYsonContainerValue);
const auto typeConst = ConstantInt::get(Type::getInt64Ty(context), (ui64)type);
const auto holderFactoryConst = ConstantInt::get(Type::getInt64Ty(context), (ui64)&HolderFactory_);
const auto wrapConst = ConstantInt::get(Type::getInt1Ty(context), wrapOptional);
diff --git a/yt/yql/providers/yt/codec/ya.make b/yt/yql/providers/yt/codec/ya.make
index 8853ac1485..ef0593121f 100644
--- a/yt/yql/providers/yt/codec/ya.make
+++ b/yt/yql/providers/yt/codec/ya.make
@@ -33,6 +33,7 @@ PEERDIR(
yt/yql/providers/yt/common
yt/yql/providers/yt/lib/mkql_helpers
yt/yql/providers/yt/lib/skiff
+ yt/yt/library/decimal
yql/essentials/providers/common/codec/yt_arrow_converter_interface
)
diff --git a/yt/yql/providers/yt/codec/yt_codec.cpp b/yt/yql/providers/yt/codec/yt_codec.cpp
index 86381e1920..6e64136937 100644
--- a/yt/yql/providers/yt/codec/yt_codec.cpp
+++ b/yt/yql/providers/yt/codec/yt_codec.cpp
@@ -10,9 +10,17 @@
#include <yql/essentials/minikql/mkql_node_builder.h>
#include <yql/essentials/minikql/mkql_string_util.h>
#include <yql/essentials/utils/yql_panic.h>
+#include <yql/essentials/utils/swap_bytes.h>
#include <yql/essentials/core/yql_type_annotation.h>
+#include <yql/essentials/public/result_format/yql_codec_results.h>
+#include <yql/essentials/public/decimal/yql_decimal.h>
+#include <yql/essentials/public/decimal/yql_decimal_serialize.h>
+#include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
+
+#include <yt/yt/library/decimal/decimal.h>
#include <library/cpp/yson/node/node_io.h>
+#include <library/cpp/yson/detail.h>
#include <util/generic/hash_set.h>
#include <util/generic/map.h>
@@ -28,6 +36,7 @@ namespace NYql {
using namespace NKikimr;
using namespace NKikimr::NMiniKQL;
+using namespace NYson::NDetail;
//////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -675,4 +684,1913 @@ TMkqlIOCache::TMkqlIOCache(const TMkqlIOSpecs& specs, const THolderFactory& hold
}
}
+template <typename T>
+T ReadYsonFloatNumberInTableFormat(char cmd, NCommon::TInputBuf& buf) {
+ CHECK_EXPECTED(cmd, DoubleMarker);
+ double dbl;
+ buf.ReadMany((char*)&dbl, sizeof(dbl));
+ return dbl;
+}
+
+NUdf::TUnboxedValue ReadYsonValueInTableFormat(TType* type, ui64 nativeYtTypeFlags,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory, char cmd, NCommon::TInputBuf& buf) {
+ switch (type->GetKind()) {
+ case TType::EKind::Variant: {
+ auto varType = static_cast<TVariantType*>(type);
+ auto underlyingType = varType->GetUnderlyingType();
+ if (nativeYtTypeFlags & NTCF_COMPLEX) {
+ CHECK_EXPECTED(cmd, BeginListSymbol);
+ cmd = buf.Read();
+ TType* type = nullptr;
+ i64 index = 0;
+ if (cmd == StringMarker) {
+ YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type");
+ auto structType = static_cast<TStructType*>(underlyingType);
+ auto nameBuffer = ReadNextString(cmd, buf);
+ auto foundIndex = structType->FindMemberIndex(nameBuffer);
+ YQL_ENSURE(foundIndex.Defined(), "Unexpected member: " << nameBuffer);
+ index = *foundIndex;
+ type = varType->GetAlternativeType(index);
+ } else {
+ YQL_ENSURE(cmd == Int64Marker || cmd == Uint64Marker);
+ YQL_ENSURE(underlyingType->IsTuple(), "Expected tuple as underlying type");
+ if (cmd == Uint64Marker) {
+ index = buf.ReadVarUI64();
+ } else {
+ index = buf.ReadVarI64();
+ }
+ YQL_ENSURE(0 <= index && index < varType->GetAlternativesCount(), "Unexpected member index: " << index);
+ type = varType->GetAlternativeType(index);
+ }
+ cmd = buf.Read();
+ CHECK_EXPECTED(cmd, ListItemSeparatorSymbol);
+ cmd = buf.Read();
+ auto value = ReadYsonValueInTableFormat(type, nativeYtTypeFlags, holderFactory, cmd, buf);
+ cmd = buf.Read();
+ if (cmd != EndListSymbol) {
+ CHECK_EXPECTED(cmd, ListItemSeparatorSymbol);
+ cmd = buf.Read();
+ CHECK_EXPECTED(cmd, EndListSymbol);
+ }
+ return holderFactory.CreateVariantHolder(value.Release(), index);
+ } else {
+ if (cmd == StringMarker) {
+ YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type");
+ auto name = ReadNextString(cmd, buf);
+ auto index = static_cast<TStructType*>(underlyingType)->FindMemberIndex(name);
+ YQL_ENSURE(index, "Unexpected member: " << name);
+ YQL_ENSURE(static_cast<TStructType*>(underlyingType)->GetMemberType(*index)->IsVoid(), "Expected Void as underlying type");
+ return holderFactory.CreateVariantHolder(NUdf::TUnboxedValuePod::Zero(), *index);
+ }
+
+ CHECK_EXPECTED(cmd, BeginListSymbol);
+ cmd = buf.Read();
+ i64 index = 0;
+ YQL_ENSURE(cmd == Int64Marker || cmd == Uint64Marker);
+ if (cmd == Uint64Marker) {
+ index = buf.ReadVarUI64();
+ } else {
+ index = buf.ReadVarI64();
+ }
+
+ YQL_ENSURE(index < varType->GetAlternativesCount(), "Bad variant alternative: " << index << ", only " <<
+ varType->GetAlternativesCount() << " are available");
+ YQL_ENSURE(underlyingType->IsTuple() || underlyingType->IsStruct(), "Wrong underlying type");
+ TType* itemType;
+ if (underlyingType->IsTuple()) {
+ itemType = static_cast<TTupleType*>(underlyingType)->GetElementType(index);
+ }
+ else {
+ itemType = static_cast<TStructType*>(underlyingType)->GetMemberType(index);
+ }
+
+ EXPECTED(buf, ListItemSeparatorSymbol);
+ cmd = buf.Read();
+ auto value = ReadYsonValueInTableFormat(itemType, nativeYtTypeFlags, holderFactory, cmd, buf);
+ cmd = buf.Read();
+ if (cmd == ListItemSeparatorSymbol) {
+ cmd = buf.Read();
+ }
+
+ CHECK_EXPECTED(cmd, EndListSymbol);
+ return holderFactory.CreateVariantHolder(value.Release(), index);
+ }
+ }
+
+ case TType::EKind::Data: {
+ auto schemeType = static_cast<TDataType*>(type)->GetSchemeType();
+ switch (schemeType) {
+ case NUdf::TDataType<bool>::Id:
+ YQL_ENSURE(cmd == FalseMarker || cmd == TrueMarker, "Expected either true or false, but got: " << TString(cmd).Quote());
+ return NUdf::TUnboxedValuePod(cmd == TrueMarker);
+
+ case NUdf::TDataType<ui8>::Id:
+ CHECK_EXPECTED(cmd, Uint64Marker);
+ return NUdf::TUnboxedValuePod(ui8(buf.ReadVarUI64()));
+
+ case NUdf::TDataType<i8>::Id:
+ CHECK_EXPECTED(cmd, Int64Marker);
+ return NUdf::TUnboxedValuePod(i8(buf.ReadVarI64()));
+
+ case NUdf::TDataType<ui16>::Id:
+ CHECK_EXPECTED(cmd, Uint64Marker);
+ return NUdf::TUnboxedValuePod(ui16(buf.ReadVarUI64()));
+
+ case NUdf::TDataType<i16>::Id:
+ CHECK_EXPECTED(cmd, Int64Marker);
+ return NUdf::TUnboxedValuePod(i16(buf.ReadVarI64()));
+
+ case NUdf::TDataType<i32>::Id:
+ CHECK_EXPECTED(cmd, Int64Marker);
+ return NUdf::TUnboxedValuePod(i32(buf.ReadVarI64()));
+
+ case NUdf::TDataType<ui32>::Id:
+ CHECK_EXPECTED(cmd, Uint64Marker);
+ return NUdf::TUnboxedValuePod(ui32(buf.ReadVarUI64()));
+
+ case NUdf::TDataType<i64>::Id:
+ CHECK_EXPECTED(cmd, Int64Marker);
+ return NUdf::TUnboxedValuePod(buf.ReadVarI64());
+
+ case NUdf::TDataType<ui64>::Id:
+ CHECK_EXPECTED(cmd, Uint64Marker);
+ return NUdf::TUnboxedValuePod(buf.ReadVarUI64());
+
+ case NUdf::TDataType<float>::Id:
+ return NUdf::TUnboxedValuePod(ReadYsonFloatNumberInTableFormat<float>(cmd, buf));
+
+ case NUdf::TDataType<double>::Id:
+ return NUdf::TUnboxedValuePod(ReadYsonFloatNumberInTableFormat<double>(cmd, buf));
+
+ case NUdf::TDataType<NUdf::TUtf8>::Id:
+ case NUdf::TDataType<char*>::Id:
+ case NUdf::TDataType<NUdf::TJson>::Id:
+ case NUdf::TDataType<NUdf::TDyNumber>::Id:
+ case NUdf::TDataType<NUdf::TUuid>::Id: {
+ auto nextString = ReadNextString(cmd, buf);
+ return NUdf::TUnboxedValue(MakeString(NUdf::TStringRef(nextString)));
+ }
+
+ case NUdf::TDataType<NUdf::TDecimal>::Id: {
+ auto nextString = ReadNextString(cmd, buf);
+ if (nativeYtTypeFlags & NTCF_DECIMAL) {
+ auto const params = static_cast<TDataDecimalType*>(type)->GetParams();
+ if (params.first < 10) {
+ // The YQL format differs from the YT format in the inf/nan values. NDecimal::FromYtDecimal converts nan/inf
+ NDecimal::TInt128 res = NDecimal::FromYtDecimal(NYT::NDecimal::TDecimal::ParseBinary32(params.first, nextString));
+ YQL_ENSURE(!NDecimal::IsError(res));
+ return NUdf::TUnboxedValuePod(res);
+ } else if (params.first < 19) {
+ NDecimal::TInt128 res = NDecimal::FromYtDecimal(NYT::NDecimal::TDecimal::ParseBinary64(params.first, nextString));
+ YQL_ENSURE(!NDecimal::IsError(res));
+ return NUdf::TUnboxedValuePod(res);
+ } else {
+ YQL_ENSURE(params.first < 36);
+ NYT::NDecimal::TDecimal::TValue128 tmpRes = NYT::NDecimal::TDecimal::ParseBinary128(params.first, nextString);
+ NDecimal::TInt128 res;
+ static_assert(sizeof(NDecimal::TInt128) == sizeof(NYT::NDecimal::TDecimal::TValue128));
+ memcpy(&res, &tmpRes, sizeof(NDecimal::TInt128));
+ res = NDecimal::FromYtDecimal(res);
+ YQL_ENSURE(!NDecimal::IsError(res));
+ return NUdf::TUnboxedValuePod(res);
+ }
+ }
+ else {
+ const auto& des = NDecimal::Deserialize(nextString.data(), nextString.size());
+ YQL_ENSURE(!NDecimal::IsError(des.first));
+ YQL_ENSURE(nextString.size() == des.second);
+ return NUdf::TUnboxedValuePod(des.first);
+ }
+ }
+
+ case NUdf::TDataType<NUdf::TYson>::Id: {
+ auto& yson = buf.YsonBuffer();
+ yson.clear();
+ CopyYsonWithAttrs(cmd, buf, yson);
+
+ return NUdf::TUnboxedValue(MakeString(NUdf::TStringRef(yson)));
+ }
+
+ case NUdf::TDataType<NUdf::TDate>::Id:
+ CHECK_EXPECTED(cmd, Uint64Marker);
+ return NUdf::TUnboxedValuePod((ui16)buf.ReadVarUI64());
+
+ case NUdf::TDataType<NUdf::TDatetime>::Id:
+ CHECK_EXPECTED(cmd, Uint64Marker);
+ return NUdf::TUnboxedValuePod((ui32)buf.ReadVarUI64());
+
+ case NUdf::TDataType<NUdf::TTimestamp>::Id:
+ CHECK_EXPECTED(cmd, Uint64Marker);
+ return NUdf::TUnboxedValuePod(buf.ReadVarUI64());
+
+ case NUdf::TDataType<NUdf::TInterval>::Id:
+ CHECK_EXPECTED(cmd, Int64Marker);
+ return NUdf::TUnboxedValuePod(buf.ReadVarI64());
+
+ case NUdf::TDataType<NUdf::TTzDate>::Id: {
+ auto nextString = ReadNextString(cmd, buf);
+ NUdf::TUnboxedValuePod data;
+ ui16 value;
+ ui16 tzId = 0;
+ YQL_ENSURE(DeserializeTzDate(nextString, value, tzId));
+ data = NUdf::TUnboxedValuePod(value);
+ data.SetTimezoneId(tzId);
+ return data;
+ }
+
+ case NUdf::TDataType<NUdf::TTzDatetime>::Id: {
+ auto nextString = ReadNextString(cmd, buf);
+ NUdf::TUnboxedValuePod data;
+ ui32 value;
+ ui16 tzId = 0;
+ YQL_ENSURE(DeserializeTzDatetime(nextString, value, tzId));
+ data = NUdf::TUnboxedValuePod(value);
+ data.SetTimezoneId(tzId);
+ return data;
+ }
+
+ case NUdf::TDataType<NUdf::TTzTimestamp>::Id: {
+ auto nextString = ReadNextString(cmd, buf);
+ NUdf::TUnboxedValuePod data;
+ ui64 value;
+ ui16 tzId = 0;
+ YQL_ENSURE(DeserializeTzTimestamp(nextString, value, tzId));
+ data = NUdf::TUnboxedValuePod(value);
+ data.SetTimezoneId(tzId);
+ return data;
+ }
+
+ case NUdf::TDataType<NUdf::TDate32>::Id:
+ CHECK_EXPECTED(cmd, Int64Marker);
+ return NUdf::TUnboxedValuePod((i32)buf.ReadVarI64());
+
+ case NUdf::TDataType<NUdf::TDatetime64>::Id:
+ case NUdf::TDataType<NUdf::TTimestamp64>::Id:
+ case NUdf::TDataType<NUdf::TInterval64>::Id:
+ CHECK_EXPECTED(cmd, Int64Marker);
+ return NUdf::TUnboxedValuePod(buf.ReadVarI64());
+
+ case NUdf::TDataType<NUdf::TJsonDocument>::Id: {
+ return ValueFromString(EDataSlot::JsonDocument, ReadNextString(cmd, buf));
+ }
+
+ case NUdf::TDataType<NUdf::TTzDate32>::Id: {
+ auto nextString = ReadNextString(cmd, buf);
+ NUdf::TUnboxedValuePod data;
+ i32 value;
+ ui16 tzId = 0;
+ YQL_ENSURE(DeserializeTzDate32(nextString, value, tzId));
+ data = NUdf::TUnboxedValuePod(value);
+ data.SetTimezoneId(tzId);
+ return data;
+ }
+
+ case NUdf::TDataType<NUdf::TTzDatetime64>::Id: {
+ auto nextString = ReadNextString(cmd, buf);
+ NUdf::TUnboxedValuePod data;
+ i64 value;
+ ui16 tzId = 0;
+ YQL_ENSURE(DeserializeTzDatetime64(nextString, value, tzId));
+ data = NUdf::TUnboxedValuePod(value);
+ data.SetTimezoneId(tzId);
+ return data;
+ }
+
+ case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: {
+ auto nextString = ReadNextString(cmd, buf);
+ NUdf::TUnboxedValuePod data;
+ i64 value;
+ ui16 tzId = 0;
+ YQL_ENSURE(DeserializeTzTimestamp64(nextString, value, tzId));
+ data = NUdf::TUnboxedValuePod(value);
+ data.SetTimezoneId(tzId);
+ return data;
+ }
+
+ default:
+ YQL_ENSURE(false, "Unsupported data type: " << schemeType);
+ }
+ }
+
+ case TType::EKind::Struct: {
+ YQL_ENSURE(cmd == BeginListSymbol || cmd == BeginMapSymbol);
+ auto structType = static_cast<TStructType*>(type);
+ NUdf::TUnboxedValue* items;
+ NUdf::TUnboxedValue ret = holderFactory.CreateDirectArrayHolder(structType->GetMembersCount(), items);
+ if (cmd == BeginListSymbol) {
+ cmd = buf.Read();
+
+ for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
+ items[i] = ReadYsonValueInTableFormat(structType->GetMemberType(i), nativeYtTypeFlags, holderFactory, cmd, buf);
+
+ cmd = buf.Read();
+ if (cmd == ListItemSeparatorSymbol) {
+ cmd = buf.Read();
+ }
+ }
+
+ CHECK_EXPECTED(cmd, EndListSymbol);
+ return ret;
+ } else {
+ cmd = buf.Read();
+
+ for (;;) {
+ if (cmd == EndMapSymbol) {
+ break;
+ }
+
+ auto keyBuffer = ReadNextString(cmd, buf);
+ auto pos = structType->FindMemberIndex(keyBuffer);
+ EXPECTED(buf, KeyValueSeparatorSymbol);
+ cmd = buf.Read();
+ if (pos && cmd != '#') {
+ auto memberType = structType->GetMemberType(*pos);
+ auto unwrappedType = memberType;
+ if (!(nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) && unwrappedType->IsOptional()) {
+ unwrappedType = static_cast<TOptionalType*>(unwrappedType)->GetItemType();
+ }
+
+ items[*pos] = ReadYsonValueInTableFormat(unwrappedType, nativeYtTypeFlags, holderFactory, cmd, buf);
+ } else {
+ SkipYson(cmd, buf);
+ }
+
+ cmd = buf.Read();
+ if (cmd == KeyedItemSeparatorSymbol) {
+ cmd = buf.Read();
+ }
+ }
+
+ for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
+ if (items[i]) {
+ continue;
+ }
+
+ YQL_ENSURE(structType->GetMemberType(i)->IsOptional(), "Missing required field: " << structType->GetMemberName(i));
+ }
+
+ return ret;
+ }
+ }
+
+ case TType::EKind::List: {
+ auto itemType = static_cast<TListType*>(type)->GetItemType();
+ TDefaultListRepresentation items;
+ CHECK_EXPECTED(cmd, BeginListSymbol);
+ cmd = buf.Read();
+
+ for (;;) {
+ if (cmd == EndListSymbol) {
+ break;
+ }
+
+ items = items.Append(ReadYsonValueInTableFormat(itemType, nativeYtTypeFlags, holderFactory, cmd, buf));
+
+ cmd = buf.Read();
+ if (cmd == ListItemSeparatorSymbol) {
+ cmd = buf.Read();
+ }
+ }
+
+ return holderFactory.CreateDirectListHolder(std::move(items));
+ }
+
+ case TType::EKind::Optional: {
+ if (cmd == EntitySymbol) {
+ return NUdf::TUnboxedValuePod();
+ }
+ auto itemType = static_cast<TOptionalType*>(type)->GetItemType();
+ if (nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) {
+ if (itemType->GetKind() == TType::EKind::Optional || itemType->GetKind() == TType::EKind::Pg) {
+ CHECK_EXPECTED(cmd, BeginListSymbol);
+ cmd = buf.Read();
+ auto value = ReadYsonValueInTableFormat(itemType, nativeYtTypeFlags, holderFactory, cmd, buf);
+ cmd = buf.Read();
+ if (cmd == ListItemSeparatorSymbol) {
+ cmd = buf.Read();
+ }
+ CHECK_EXPECTED(cmd, EndListSymbol);
+ return value.Release().MakeOptional();
+ } else {
+ return ReadYsonValueInTableFormat(itemType, nativeYtTypeFlags, holderFactory, cmd, buf).Release().MakeOptional();
+ }
+ } else {
+ if (cmd != BeginListSymbol) {
+ auto value = ReadYsonValueInTableFormat(itemType, nativeYtTypeFlags, holderFactory, cmd, buf);
+ return value.Release().MakeOptional();
+ }
+
+ cmd = buf.Read();
+ if (cmd == EndListSymbol) {
+ return NUdf::TUnboxedValuePod();
+ }
+
+ auto value = ReadYsonValueInTableFormat(itemType, nativeYtTypeFlags, holderFactory, cmd, buf);
+ cmd = buf.Read();
+ if (cmd == ListItemSeparatorSymbol) {
+ cmd = buf.Read();
+ }
+
+ CHECK_EXPECTED(cmd, EndListSymbol);
+ return value.Release().MakeOptional();
+ }
+ }
+
+ case TType::EKind::Dict: {
+ auto dictType = static_cast<TDictType*>(type);
+ auto keyType = dictType->GetKeyType();
+ auto payloadType = dictType->GetPayloadType();
+ TKeyTypes types;
+ bool isTuple;
+ bool encoded;
+ bool useIHash;
+ GetDictionaryKeyTypes(keyType, types, isTuple, encoded, useIHash);
+
+ TMaybe<TValuePacker> packer;
+ if (encoded) {
+ packer.ConstructInPlace(true, keyType);
+ }
+
+ YQL_ENSURE(cmd == BeginListSymbol || cmd == BeginMapSymbol, "Expected '{' or '[', but read: " << TString(cmd).Quote());
+ if (cmd == BeginMapSymbol) {
+ bool unusedIsOptional;
+ auto unpackedType = UnpackOptional(keyType, unusedIsOptional);
+ YQL_ENSURE(unpackedType->IsData() &&
+ (static_cast<TDataType*>(unpackedType)->GetSchemeType() == NUdf::TDataType<char*>::Id ||
+ static_cast<TDataType*>(unpackedType)->GetSchemeType() == NUdf::TDataType<NUdf::TUtf8>::Id),
+ "Expected String or Utf8 type as dictionary key type");
+
+ auto filler = [&](TValuesDictHashMap& map) {
+ cmd = buf.Read();
+
+ for (;;) {
+ if (cmd == EndMapSymbol) {
+ break;
+ }
+
+ auto keyBuffer = ReadNextString(cmd, buf);
+ auto keyStr = NUdf::TUnboxedValue(MakeString(keyBuffer));
+ EXPECTED(buf, KeyValueSeparatorSymbol);
+ cmd = buf.Read();
+ auto payload = ReadYsonValueInTableFormat(payloadType, nativeYtTypeFlags, holderFactory, cmd, buf);
+ map.emplace(std::move(keyStr), std::move(payload));
+
+ cmd = buf.Read();
+ if (cmd == KeyedItemSeparatorSymbol) {
+ cmd = buf.Read();
+ }
+ }
+ };
+
+ const NUdf::IHash* hash = holderFactory.GetHash(*keyType, useIHash);
+ const NUdf::IEquate* equate = holderFactory.GetEquate(*keyType, useIHash);
+ return holderFactory.CreateDirectHashedDictHolder(filler, types, isTuple, true, nullptr, hash, equate);
+ }
+ else {
+ auto filler = [&](TValuesDictHashMap& map) {
+ cmd = buf.Read();
+
+ for (;;) {
+ if (cmd == EndListSymbol) {
+ break;
+ }
+
+ CHECK_EXPECTED(cmd, BeginListSymbol);
+ cmd = buf.Read();
+ auto key = ReadYsonValueInTableFormat(keyType, nativeYtTypeFlags, holderFactory, cmd, buf);
+ EXPECTED(buf, ListItemSeparatorSymbol);
+ cmd = buf.Read();
+ auto payload = ReadYsonValueInTableFormat(payloadType, nativeYtTypeFlags, holderFactory, cmd, buf);
+ cmd = buf.Read();
+ if (cmd == ListItemSeparatorSymbol) {
+ cmd = buf.Read();
+ }
+
+ CHECK_EXPECTED(cmd, EndListSymbol);
+ if (packer) {
+ key = MakeString(packer->Pack(key));
+ }
+
+ map.emplace(std::move(key), std::move(payload));
+
+ cmd = buf.Read();
+ if (cmd == ListItemSeparatorSymbol) {
+ cmd = buf.Read();
+ }
+ }
+ };
+
+ const NUdf::IHash* hash = holderFactory.GetHash(*keyType, useIHash);
+ const NUdf::IEquate* equate = holderFactory.GetEquate(*keyType, useIHash);
+ return holderFactory.CreateDirectHashedDictHolder(filler, types, isTuple, true, encoded ? keyType : nullptr,
+ hash, equate);
+ }
+ }
+
+ case TType::EKind::Tuple: {
+ auto tupleType = static_cast<TTupleType*>(type);
+ NUdf::TUnboxedValue* items;
+ NUdf::TUnboxedValue ret = holderFactory.CreateDirectArrayHolder(tupleType->GetElementsCount(), items);
+ CHECK_EXPECTED(cmd, BeginListSymbol);
+ cmd = buf.Read();
+
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ items[i] = ReadYsonValueInTableFormat(tupleType->GetElementType(i), nativeYtTypeFlags, holderFactory, cmd, buf);
+
+ cmd = buf.Read();
+ if (cmd == ListItemSeparatorSymbol) {
+ cmd = buf.Read();
+ }
+ }
+
+
+ CHECK_EXPECTED(cmd, EndListSymbol);
+ return ret;
+ }
+
+ case TType::EKind::Void: {
+ if (cmd == EntitySymbol) {
+ return NUdf::TUnboxedValuePod::Void();
+ }
+
+ auto nextString = ReadNextString(cmd, buf);
+ YQL_ENSURE(nextString == NResult::TYsonResultWriter::VoidString, "Expected Void");
+ return NUdf::TUnboxedValuePod::Void();
+ }
+
+ case TType::EKind::Null: {
+ CHECK_EXPECTED(cmd, EntitySymbol);
+ return NUdf::TUnboxedValuePod();
+ }
+
+ case TType::EKind::EmptyList: {
+ CHECK_EXPECTED(cmd, BeginListSymbol);
+ cmd = buf.Read();
+ CHECK_EXPECTED(cmd, EndListSymbol);
+ return holderFactory.GetEmptyContainerLazy();
+ }
+
+ case TType::EKind::EmptyDict: {
+ YQL_ENSURE(cmd == BeginListSymbol || cmd == BeginMapSymbol, "Expected '{' or '[', but read: " << TString(cmd).Quote());
+ if (cmd == BeginListSymbol) {
+ cmd = buf.Read();
+ CHECK_EXPECTED(cmd, EndListSymbol);
+ } else {
+ cmd = buf.Read();
+ CHECK_EXPECTED(cmd, EndMapSymbol);
+ }
+
+ return holderFactory.GetEmptyContainerLazy();
+ }
+
+ case TType::EKind::Pg: {
+ auto pgType = static_cast<TPgType*>(type);
+ return ReadYsonValueInTableFormatPg(pgType, cmd, buf);
+ }
+
+ case TType::EKind::Tagged: {
+ auto taggedType = static_cast<TTaggedType*>(type);
+ return ReadYsonValueInTableFormat(taggedType->GetBaseType(), nativeYtTypeFlags, holderFactory, cmd, buf);
+ }
+
+ default:
+ YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr());
+ }
+}
+
+TMaybe<NUdf::TUnboxedValue> ParseYsonValueInTableFormat(const THolderFactory& holderFactory,
+ const TStringBuf& yson, TType* type, ui64 nativeYtTypeFlags, IOutputStream* err) {
+ try {
+ class TReader : public NCommon::IBlockReader {
+ public:
+ TReader(const TStringBuf& yson)
+ : Yson_(yson)
+ {}
+
+ void SetDeadline(TInstant deadline) override {
+ Y_UNUSED(deadline);
+ }
+
+ std::pair<const char*, const char*> NextFilledBlock() override {
+ if (FirstBuffer_) {
+ FirstBuffer_ = false;
+ return{ Yson_.begin(), Yson_.end() };
+ }
+ else {
+ return{ nullptr, nullptr };
+ }
+ }
+
+ void ReturnBlock() override {
+ }
+
+ bool Retry(const TMaybe<ui32>& rangeIndex, const TMaybe<ui64>& rowIndex, const std::exception_ptr& error) override {
+ Y_UNUSED(rangeIndex);
+ Y_UNUSED(rowIndex);
+ Y_UNUSED(error);
+ return false;
+ }
+
+ private:
+ TStringBuf Yson_;
+ bool FirstBuffer_ = true;
+ };
+
+ TReader reader(yson);
+ NCommon::TInputBuf buf(reader, nullptr);
+ char cmd = buf.Read();
+ return ReadYsonValueInTableFormat(type, nativeYtTypeFlags, holderFactory, cmd, buf);
+ }
+ catch (const yexception& e) {
+ if (err) {
+ *err << "YSON parsing failed: " << e.what();
+ }
+ return Nothing();
+ }
+}
+
+TMaybe<NUdf::TUnboxedValue> ParseYsonNode(const THolderFactory& holderFactory,
+ const NYT::TNode& node, TType* type, ui64 nativeYtTypeFlags, IOutputStream* err) {
+ return ParseYsonValueInTableFormat(holderFactory, NYT::NodeToYsonString(node, NYson::EYsonFormat::Binary), type, nativeYtTypeFlags, err);
+}
+
+extern "C" void ReadYsonContainerValue(TType* type, ui64 nativeYtTypeFlags, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf, bool wrapOptional) {
+ // yson content
+ ui32 size;
+ buf.ReadMany((char*)&size, sizeof(size));
+ CHECK_STRING_LENGTH_UNSIGNED(size);
+ // parse binary yson...
+ YQL_ENSURE(size > 0);
+ char cmd = buf.Read();
+ auto tmp = ReadYsonValueInTableFormat(type, nativeYtTypeFlags, holderFactory, cmd, buf);
+ if (!wrapOptional) {
+ value = std::move(tmp);
+ }
+ else {
+ value = tmp.Release().MakeOptional();
+ }
+}
+
+extern "C" void ReadContainerNativeYtValue(TType* type, ui64 nativeYtTypeFlags, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf, bool wrapOptional) {
+ auto tmp = ReadSkiffNativeYtValue(type, nativeYtTypeFlags, holderFactory, buf);
+ if (!wrapOptional) {
+ value = std::move(tmp);
+ } else {
+ value = tmp.Release().MakeOptional();
+ }
+}
+
+void WriteYsonValueInTableFormat(NCommon::TOutputBuf& buf, TType* type, ui64 nativeYtTypeFlags, const NUdf::TUnboxedValuePod& value, bool topLevel) {
+ // Table format, very compact
+ switch (type->GetKind()) {
+ case TType::EKind::Variant: {
+ buf.Write(BeginListSymbol);
+ auto varType = static_cast<TVariantType*>(type);
+ auto underlyingType = varType->GetUnderlyingType();
+ auto index = value.GetVariantIndex();
+ YQL_ENSURE(index < varType->GetAlternativesCount(), "Bad variant alternative: " << index << ", only " << varType->GetAlternativesCount() << " are available");
+ YQL_ENSURE(underlyingType->IsTuple() || underlyingType->IsStruct(), "Wrong underlying type");
+ TType* itemType;
+ if (underlyingType->IsTuple()) {
+ itemType = static_cast<TTupleType*>(underlyingType)->GetElementType(index);
+ }
+ else {
+ itemType = static_cast<TStructType*>(underlyingType)->GetMemberType(index);
+ }
+ if (!(nativeYtTypeFlags & NTCF_COMPLEX) || underlyingType->IsTuple()) {
+ buf.Write(Uint64Marker);
+ buf.WriteVarUI64(index);
+ } else {
+ auto structType = static_cast<TStructType*>(underlyingType);
+ auto varName = structType->GetMemberName(index);
+ buf.Write(StringMarker);
+ buf.WriteVarI32(varName.size());
+ buf.WriteMany(varName);
+ }
+ buf.Write(ListItemSeparatorSymbol);
+ WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, value.GetVariantItem(), false);
+ buf.Write(ListItemSeparatorSymbol);
+ buf.Write(EndListSymbol);
+ break;
+ }
+
+ case TType::EKind::Data: {
+ auto schemeType = static_cast<TDataType*>(type)->GetSchemeType();
+ switch (schemeType) {
+ case NUdf::TDataType<bool>::Id: {
+ buf.Write(value.Get<bool>() ? TrueMarker : FalseMarker);
+ break;
+ }
+
+ case NUdf::TDataType<ui8>::Id:
+ buf.Write(Uint64Marker);
+ buf.WriteVarUI64(value.Get<ui8>());
+ break;
+
+ case NUdf::TDataType<i8>::Id:
+ buf.Write(Int64Marker);
+ buf.WriteVarI64(value.Get<i8>());
+ break;
+
+ case NUdf::TDataType<ui16>::Id:
+ buf.Write(Uint64Marker);
+ buf.WriteVarUI64(value.Get<ui16>());
+ break;
+
+ case NUdf::TDataType<i16>::Id:
+ buf.Write(Int64Marker);
+ buf.WriteVarI64(value.Get<i16>());
+ break;
+
+ case NUdf::TDataType<i32>::Id:
+ buf.Write(Int64Marker);
+ buf.WriteVarI64(value.Get<i32>());
+ break;
+
+ case NUdf::TDataType<ui32>::Id:
+ buf.Write(Uint64Marker);
+ buf.WriteVarUI64(value.Get<ui32>());
+ break;
+
+ case NUdf::TDataType<i64>::Id:
+ buf.Write(Int64Marker);
+ buf.WriteVarI64(value.Get<i64>());
+ break;
+
+ case NUdf::TDataType<ui64>::Id:
+ buf.Write(Uint64Marker);
+ buf.WriteVarUI64(value.Get<ui64>());
+ break;
+
+ case NUdf::TDataType<float>::Id: {
+ buf.Write(DoubleMarker);
+ double val = value.Get<float>();
+ buf.WriteMany((const char*)&val, sizeof(val));
+ break;
+ }
+
+ case NUdf::TDataType<double>::Id: {
+ buf.Write(DoubleMarker);
+ double val = value.Get<double>();
+ buf.WriteMany((const char*)&val, sizeof(val));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TUtf8>::Id:
+ case NUdf::TDataType<char*>::Id:
+ case NUdf::TDataType<NUdf::TJson>::Id:
+ case NUdf::TDataType<NUdf::TDyNumber>::Id:
+ case NUdf::TDataType<NUdf::TUuid>::Id: {
+ buf.Write(StringMarker);
+ auto str = value.AsStringRef();
+ buf.WriteVarI32(str.Size());
+ buf.WriteMany(str);
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TDecimal>::Id: {
+ buf.Write(StringMarker);
+ if (nativeYtTypeFlags & NTCF_DECIMAL){
+ auto const params = static_cast<TDataDecimalType*>(type)->GetParams();
+ const NDecimal::TInt128 data128 = value.GetInt128();
+ char tmpBuf[NYT::NDecimal::TDecimal::MaxBinarySize];
+ if (params.first < 10) {
+ // The YQL format differs from the YT format in the inf/nan values. NDecimal::FromYtDecimal converts nan/inf
+ TStringBuf resBuf = NYT::NDecimal::TDecimal::WriteBinary32(params.first, NDecimal::ToYtDecimal<i32>(data128), tmpBuf, NYT::NDecimal::TDecimal::MaxBinarySize);
+ buf.WriteVarI32(resBuf.size());
+ buf.WriteMany(resBuf.data(), resBuf.size());
+ } else if (params.first < 19) {
+ TStringBuf resBuf = NYT::NDecimal::TDecimal::WriteBinary64(params.first, NDecimal::ToYtDecimal<i64>(data128), tmpBuf, NYT::NDecimal::TDecimal::MaxBinarySize);
+ buf.WriteVarI32(resBuf.size());
+ buf.WriteMany(resBuf.data(), resBuf.size());
+ } else {
+ YQL_ENSURE(params.first < 36);
+ NYT::NDecimal::TDecimal::TValue128 val;
+ auto data128Converted = NDecimal::ToYtDecimal<NDecimal::TInt128>(data128);
+ memcpy(&val, &data128Converted, sizeof(val));
+ auto resBuf = NYT::NDecimal::TDecimal::WriteBinary128(params.first, val, tmpBuf, NYT::NDecimal::TDecimal::MaxBinarySize);
+ buf.WriteVarI32(resBuf.size());
+ buf.WriteMany(resBuf.data(), resBuf.size());
+ }
+ } else {
+ char data[sizeof(NDecimal::TInt128)];
+ const ui32 size = NDecimal::Serialize(value.GetInt128(), data);
+ buf.WriteVarI32(size);
+ buf.WriteMany(data, size);
+ }
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TYson>::Id: {
+ // embed content
+ buf.WriteMany(value.AsStringRef());
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TDate>::Id:
+ buf.Write(Uint64Marker);
+ buf.WriteVarUI64(value.Get<ui16>());
+ break;
+
+ case NUdf::TDataType<NUdf::TDatetime>::Id:
+ buf.Write(Uint64Marker);
+ buf.WriteVarUI64(value.Get<ui32>());
+ break;
+
+ case NUdf::TDataType<NUdf::TTimestamp>::Id:
+ buf.Write(Uint64Marker);
+ buf.WriteVarUI64(value.Get<ui64>());
+ break;
+
+ case NUdf::TDataType<NUdf::TInterval>::Id:
+ case NUdf::TDataType<NUdf::TInterval64>::Id:
+ case NUdf::TDataType<NUdf::TDatetime64>::Id:
+ case NUdf::TDataType<NUdf::TTimestamp64>::Id:
+ buf.Write(Int64Marker);
+ buf.WriteVarI64(value.Get<i64>());
+ break;
+
+ case NUdf::TDataType<NUdf::TDate32>::Id:
+ buf.Write(Int64Marker);
+ buf.WriteVarI64(value.Get<i32>());
+ break;
+
+ case NUdf::TDataType<NUdf::TTzDate>::Id: {
+ ui16 tzId = SwapBytes(value.GetTimezoneId());
+ ui16 data = SwapBytes(value.Get<ui16>());
+ ui32 size = sizeof(data) + sizeof(tzId);
+ buf.Write(StringMarker);
+ buf.WriteVarI32(size);
+ buf.WriteMany((const char*)&data, sizeof(data));
+ buf.WriteMany((const char*)&tzId, sizeof(tzId));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TTzDatetime>::Id: {
+ ui16 tzId = SwapBytes(value.GetTimezoneId());
+ ui32 data = SwapBytes(value.Get<ui32>());
+ ui32 size = sizeof(data) + sizeof(tzId);
+ buf.Write(StringMarker);
+ buf.WriteVarI32(size);
+ buf.WriteMany((const char*)&data, sizeof(data));
+ buf.WriteMany((const char*)&tzId, sizeof(tzId));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TTzTimestamp>::Id: {
+ ui16 tzId = SwapBytes(value.GetTimezoneId());
+ ui64 data = SwapBytes(value.Get<ui64>());
+ ui32 size = sizeof(data) + sizeof(tzId);
+ buf.Write(StringMarker);
+ buf.WriteVarI32(size);
+ buf.WriteMany((const char*)&data, sizeof(data));
+ buf.WriteMany((const char*)&tzId, sizeof(tzId));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TTzDate32>::Id: {
+ ui16 tzId = SwapBytes(value.GetTimezoneId());
+ ui32 data = 0x80 ^ SwapBytes((ui32)value.Get<i32>());
+ ui32 size = sizeof(data) + sizeof(tzId);
+ buf.Write(StringMarker);
+ buf.WriteVarI32(size);
+ buf.WriteMany((const char*)&data, sizeof(data));
+ buf.WriteMany((const char*)&tzId, sizeof(tzId));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TTzDatetime64>::Id: {
+ ui16 tzId = SwapBytes(value.GetTimezoneId());
+ ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>());
+ ui32 size = sizeof(data) + sizeof(tzId);
+ buf.Write(StringMarker);
+ buf.WriteVarI32(size);
+ buf.WriteMany((const char*)&data, sizeof(data));
+ buf.WriteMany((const char*)&tzId, sizeof(tzId));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: {
+ ui16 tzId = SwapBytes(value.GetTimezoneId());
+ ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>());
+ ui32 size = sizeof(data) + sizeof(tzId);
+ buf.Write(StringMarker);
+ buf.WriteVarI32(size);
+ buf.WriteMany((const char*)&data, sizeof(data));
+ buf.WriteMany((const char*)&tzId, sizeof(tzId));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TJsonDocument>::Id: {
+ buf.Write(StringMarker);
+ NUdf::TUnboxedValue json = ValueToString(EDataSlot::JsonDocument, value);
+ auto str = json.AsStringRef();
+ buf.WriteVarI32(str.Size());
+ buf.WriteMany(str);
+ break;
+ }
+
+ default:
+ YQL_ENSURE(false, "Unsupported data type: " << schemeType);
+ }
+
+ break;
+ }
+
+ case TType::EKind::Struct: {
+ auto structType = static_cast<TStructType*>(type);
+ if (nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) {
+ buf.Write(BeginMapSymbol);
+ for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
+ buf.Write(StringMarker);
+ auto key = structType->GetMemberName(i);
+ buf.WriteVarI32(key.size());
+ buf.WriteMany(key);
+ buf.Write(KeyValueSeparatorSymbol);
+ WriteYsonValueInTableFormat(buf, structType->GetMemberType(i), nativeYtTypeFlags, value.GetElement(i), false);
+ buf.Write(KeyedItemSeparatorSymbol);
+ }
+ buf.Write(EndMapSymbol);
+ } else {
+ buf.Write(BeginListSymbol);
+ for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
+ WriteYsonValueInTableFormat(buf, structType->GetMemberType(i), nativeYtTypeFlags, value.GetElement(i), false);
+ buf.Write(ListItemSeparatorSymbol);
+ }
+ buf.Write(EndListSymbol);
+ }
+ break;
+ }
+
+ case TType::EKind::List: {
+ auto itemType = static_cast<TListType*>(type)->GetItemType();
+ const auto iter = value.GetListIterator();
+ buf.Write(BeginListSymbol);
+ for (NUdf::TUnboxedValue item; iter.Next(item); buf.Write(ListItemSeparatorSymbol)) {
+ WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, item, false);
+ }
+
+ buf.Write(EndListSymbol);
+ break;
+ }
+
+ case TType::EKind::Optional: {
+ auto itemType = static_cast<TOptionalType*>(type)->GetItemType();
+ if (nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) {
+ if (value) {
+ if (itemType->GetKind() == TType::EKind::Optional || itemType->GetKind() == TType::EKind::Pg) {
+ buf.Write(BeginListSymbol);
+ }
+ WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, value.GetOptionalValue(), false);
+ if (itemType->GetKind() == TType::EKind::Optional || itemType->GetKind() == TType::EKind::Pg) {
+ buf.Write(ListItemSeparatorSymbol);
+ buf.Write(EndListSymbol);
+ }
+ } else {
+ buf.Write(EntitySymbol);
+ }
+ } else {
+ if (!value) {
+ if (topLevel) {
+ buf.Write(BeginListSymbol);
+ buf.Write(EndListSymbol);
+ }
+ else {
+ buf.Write(EntitySymbol);
+ }
+ }
+ else {
+ buf.Write(BeginListSymbol);
+ WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, value.GetOptionalValue(), false);
+ buf.Write(ListItemSeparatorSymbol);
+ buf.Write(EndListSymbol);
+ }
+ }
+ break;
+ }
+
+ case TType::EKind::Dict: {
+ auto dictType = static_cast<TDictType*>(type);
+ const auto iter = value.GetDictIterator();
+ buf.Write(BeginListSymbol);
+ for (NUdf::TUnboxedValue key, payload; iter.NextPair(key, payload);) {
+ buf.Write(BeginListSymbol);
+ WriteYsonValueInTableFormat(buf, dictType->GetKeyType(), nativeYtTypeFlags, key, false);
+ buf.Write(ListItemSeparatorSymbol);
+ WriteYsonValueInTableFormat(buf, dictType->GetPayloadType(), nativeYtTypeFlags, payload, false);
+ buf.Write(ListItemSeparatorSymbol);
+ buf.Write(EndListSymbol);
+ buf.Write(ListItemSeparatorSymbol);
+ }
+
+ buf.Write(EndListSymbol);
+ break;
+ }
+
+ case TType::EKind::Tuple: {
+ auto tupleType = static_cast<TTupleType*>(type);
+ buf.Write(BeginListSymbol);
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ WriteYsonValueInTableFormat(buf, tupleType->GetElementType(i), nativeYtTypeFlags, value.GetElement(i), false);
+ buf.Write(ListItemSeparatorSymbol);
+ }
+
+ buf.Write(EndListSymbol);
+ break;
+ }
+
+ case TType::EKind::Void: {
+ buf.Write(EntitySymbol);
+ break;
+ }
+
+ case TType::EKind::Null: {
+ buf.Write(EntitySymbol);
+ break;
+ }
+
+ case TType::EKind::EmptyList: {
+ buf.Write(BeginListSymbol);
+ buf.Write(EndListSymbol);
+ break;
+ }
+
+ case TType::EKind::EmptyDict: {
+ buf.Write(BeginListSymbol);
+ buf.Write(EndListSymbol);
+ break;
+ }
+
+ case TType::EKind::Pg: {
+ auto pgType = static_cast<TPgType*>(type);
+ WriteYsonValueInTableFormatPg(buf, pgType, value, topLevel);
+ break;
+ }
+
+ default:
+ YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr());
+ }
+}
+
+///////////////////////////////////////////
+//
+// Initial state first = last = &dummy
+//
+// +1 block first = &dummy, last = newPage, first.next = newPage, newPage.next= &dummy
+// +1 block first = &dummy, last = newPage2, first.next = newPage, newPage.next = newPage2, newPage2.next = &dummy
+//
+///////////////////////////////////////////
+class TTempBlockWriter : public NCommon::IBlockWriter {
+public:
+ TTempBlockWriter()
+ : Pool_(*TlsAllocState)
+ , Last_(&Dummy_)
+ {
+ Dummy_.Avail_ = 0;
+ Dummy_.Next_ = &Dummy_;
+ }
+
+ ~TTempBlockWriter() {
+ auto current = Dummy_.Next_; // skip dummy node
+ while (current != &Dummy_) {
+ auto next = current->Next_;
+ Pool_.ReturnPage(current);
+ current = next;
+ }
+ }
+
+ void SetRecordBoundaryCallback(std::function<void()> callback) override {
+ Y_UNUSED(callback);
+ }
+
+ void WriteBlocks(NCommon::TOutputBuf& buf) const {
+ auto current = Dummy_.Next_; // skip dummy node
+ while (current != &Dummy_) {
+ auto next = current->Next_;
+ buf.WriteMany((const char*)(current + 1), current->Avail_);
+ current = next;
+ }
+ }
+
+ TTempBlockWriter(const TTempBlockWriter&) = delete;
+ void operator=(const TTempBlockWriter&) = delete;
+
+ std::pair<char*, char*> NextEmptyBlock() override {
+ auto newPage = Pool_.GetPage();
+ auto header = (TPageHeader*)newPage;
+ header->Avail_ = 0;
+ header->Next_ = &Dummy_;
+ Last_->Next_ = header;
+ Last_ = header;
+ return std::make_pair((char*)(header + 1), (char*)newPage + TAlignedPagePool::POOL_PAGE_SIZE);
+ }
+
+ void ReturnBlock(size_t avail, std::optional<size_t> lastRecordBoundary) override {
+ Y_UNUSED(lastRecordBoundary);
+ YQL_ENSURE(avail <= TAlignedPagePool::POOL_PAGE_SIZE - sizeof(TPageHeader));
+ Last_->Avail_ = avail;
+ }
+
+ void Finish() override {
+ }
+
+private:
+ struct TPageHeader {
+ TPageHeader* Next_ = nullptr;
+ ui32 Avail_ = 0;
+ };
+
+ NKikimr::TAlignedPagePool& Pool_;
+ TPageHeader* Last_;
+ TPageHeader Dummy_;
+};
+
+extern "C" void WriteYsonContainerValue(TType* type, ui64 nativeYtTypeFlags, const NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) {
+ TTempBlockWriter blockWriter;
+ NCommon::TOutputBuf ysonBuf(blockWriter, nullptr);
+ WriteYsonValueInTableFormat(ysonBuf, type, nativeYtTypeFlags, value, true);
+ ysonBuf.Flush();
+ ui32 size = ysonBuf.GetWrittenBytes();
+ buf.WriteMany((const char*)&size, sizeof(size));
+ blockWriter.WriteBlocks(buf);
+}
+
+void SkipSkiffField(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, NCommon::TInputBuf& buf) {
+ const bool isOptional = type->IsOptional();
+ if (isOptional) {
+ // Unwrap optional
+ type = static_cast<TOptionalType*>(type)->GetItemType();
+ }
+
+ if (isOptional) {
+ auto marker = buf.Read();
+ if (!marker) {
+ return;
+ }
+ }
+
+ if (type->IsData()) {
+ auto schemeType = static_cast<TDataType*>(type)->GetSchemeType();
+ switch (schemeType) {
+ case NUdf::TDataType<bool>::Id:
+ buf.SkipMany(sizeof(ui8));
+ break;
+
+ case NUdf::TDataType<ui8>::Id:
+ case NUdf::TDataType<ui16>::Id:
+ case NUdf::TDataType<ui32>::Id:
+ case NUdf::TDataType<ui64>::Id:
+ case NUdf::TDataType<NUdf::TDate>::Id:
+ case NUdf::TDataType<NUdf::TDatetime>::Id:
+ case NUdf::TDataType<NUdf::TTimestamp>::Id:
+ buf.SkipMany(sizeof(ui64));
+ break;
+
+ case NUdf::TDataType<i8>::Id:
+ case NUdf::TDataType<i16>::Id:
+ case NUdf::TDataType<i32>::Id:
+ case NUdf::TDataType<i64>::Id:
+ case NUdf::TDataType<NUdf::TInterval>::Id:
+ case NUdf::TDataType<NUdf::TDate32>::Id:
+ case NUdf::TDataType<NUdf::TDatetime64>::Id:
+ case NUdf::TDataType<NUdf::TTimestamp64>::Id:
+ case NUdf::TDataType<NUdf::TInterval64>::Id:
+ buf.SkipMany(sizeof(i64));
+ break;
+
+ case NUdf::TDataType<float>::Id:
+ case NUdf::TDataType<double>::Id:
+ buf.SkipMany(sizeof(double));
+ break;
+
+ case NUdf::TDataType<NUdf::TUtf8>::Id:
+ case NUdf::TDataType<char*>::Id:
+ case NUdf::TDataType<NUdf::TJson>::Id:
+ case NUdf::TDataType<NUdf::TYson>::Id:
+ case NUdf::TDataType<NUdf::TUuid>::Id:
+ case NUdf::TDataType<NUdf::TDyNumber>::Id:
+ case NUdf::TDataType<NUdf::TTzDate>::Id:
+ case NUdf::TDataType<NUdf::TTzDatetime>::Id:
+ case NUdf::TDataType<NUdf::TTzTimestamp>::Id:
+ case NUdf::TDataType<NUdf::TJsonDocument>::Id: {
+ ui32 size;
+ buf.ReadMany((char*)&size, sizeof(size));
+ CHECK_STRING_LENGTH_UNSIGNED(size);
+ buf.SkipMany(size);
+ break;
+ }
+ case NUdf::TDataType<NUdf::TDecimal>::Id: {
+ if (nativeYtTypeFlags & NTCF_DECIMAL) {
+ auto const params = static_cast<TDataDecimalType*>(type)->GetParams();
+ if (params.first < 10) {
+ buf.SkipMany(sizeof(i32));
+ } else if (params.first < 19) {
+ buf.SkipMany(sizeof(i64));
+ } else {
+ buf.SkipMany(sizeof(NDecimal::TInt128));
+ }
+ } else {
+ ui32 size;
+ buf.ReadMany((char*)&size, sizeof(size));
+ CHECK_STRING_LENGTH_UNSIGNED(size);
+ buf.SkipMany(size);
+ }
+ break;
+ }
+ default:
+ YQL_ENSURE(false, "Unsupported data type: " << schemeType);
+ }
+ return;
+ }
+
+ if (type->IsPg()) {
+ SkipSkiffPg(static_cast<TPgType*>(type), buf);
+ return;
+ }
+
+ if (type->IsStruct()) {
+ auto structType = static_cast<TStructType*>(type);
+ const std::vector<size_t>* reorder = nullptr;
+ if (auto cookie = structType->GetCookie()) {
+ reorder = ((const std::vector<size_t>*)cookie);
+ }
+ for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
+ SkipSkiffField(structType->GetMemberType(reorder ? reorder->at(i) : i), nativeYtTypeFlags, buf);
+ }
+ return;
+ }
+
+ if (type->IsList()) {
+ auto itemType = static_cast<TListType*>(type)->GetItemType();
+ while (buf.Read() == '\0') {
+ SkipSkiffField(itemType, nativeYtTypeFlags, buf);
+ }
+ return;
+ }
+
+ if (type->IsTuple()) {
+ auto tupleType = static_cast<TTupleType*>(type);
+
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ SkipSkiffField(tupleType->GetElementType(i), nativeYtTypeFlags, buf);
+ }
+ return;
+ }
+
+ if (type->IsVariant()) {
+ auto varType = AS_TYPE(TVariantType, type);
+ ui16 data = 0;
+ if (varType->GetAlternativesCount() < 256) {
+ buf.ReadMany((char*)&data, 1);
+ } else {
+ buf.ReadMany((char*)&data, sizeof(data));
+ }
+
+ if (varType->GetUnderlyingType()->IsTuple()) {
+ auto tupleType = AS_TYPE(TTupleType, varType->GetUnderlyingType());
+ YQL_ENSURE(data < tupleType->GetElementsCount());
+ SkipSkiffField(tupleType->GetElementType(data), nativeYtTypeFlags, buf);
+ } else {
+ auto structType = AS_TYPE(TStructType, varType->GetUnderlyingType());
+ if (auto cookie = structType->GetCookie()) {
+ const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie);
+ data = reorder[data];
+ }
+ YQL_ENSURE(data < structType->GetMembersCount());
+
+ SkipSkiffField(structType->GetMemberType(data), nativeYtTypeFlags, buf);
+ }
+ return;
+ }
+
+ if (type->IsVoid()) {
+ return;
+ }
+
+ if (type->IsNull()) {
+ return;
+ }
+
+ if (type->IsEmptyList() || type->IsEmptyDict()) {
+ return;
+ }
+
+ if (type->IsDict()) {
+ auto dictType = AS_TYPE(TDictType, type);
+ auto keyType = dictType->GetKeyType();
+ auto payloadType = dictType->GetPayloadType();
+ while (buf.Read() == '\0') {
+ SkipSkiffField(keyType, nativeYtTypeFlags, buf);
+ SkipSkiffField(payloadType, nativeYtTypeFlags, buf);
+ }
+ return;
+ }
+
+ YQL_ENSURE(false, "Unsupported type for skip: " << type->GetKindAsStr());
+}
+
+NKikimr::NUdf::TUnboxedValue ReadSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory, NCommon::TInputBuf& buf)
+{
+ if (type->IsData()) {
+ return ReadSkiffData(type, nativeYtTypeFlags, buf);
+ }
+
+ if (type->IsPg()) {
+ return ReadSkiffPg(static_cast<TPgType*>(type), buf);
+ }
+
+ if (type->IsOptional()) {
+ auto marker = buf.Read();
+ if (!marker) {
+ return NUdf::TUnboxedValue();
+ }
+
+ auto value = ReadSkiffNativeYtValue(AS_TYPE(TOptionalType, type)->GetItemType(), nativeYtTypeFlags, holderFactory, buf);
+ return value.Release().MakeOptional();
+ }
+
+ if (type->IsTuple()) {
+ auto tupleType = AS_TYPE(TTupleType, type);
+ NUdf::TUnboxedValue* items;
+ auto value = holderFactory.CreateDirectArrayHolder(tupleType->GetElementsCount(), items);
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ items[i] = ReadSkiffNativeYtValue(tupleType->GetElementType(i), nativeYtTypeFlags, holderFactory, buf);
+ }
+
+ return value;
+ }
+
+ if (type->IsStruct()) {
+ auto structType = AS_TYPE(TStructType, type);
+ NUdf::TUnboxedValue* items;
+ auto value = holderFactory.CreateDirectArrayHolder(structType->GetMembersCount(), items);
+
+ if (auto cookie = type->GetCookie()) {
+ const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie);
+ for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
+ const auto ndx = reorder[i];
+ items[ndx] = ReadSkiffNativeYtValue(structType->GetMemberType(ndx), nativeYtTypeFlags, holderFactory, buf);
+ }
+ } else {
+ for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
+ items[i] = ReadSkiffNativeYtValue(structType->GetMemberType(i), nativeYtTypeFlags, holderFactory, buf);
+ }
+ }
+
+ return value;
+ }
+
+ if (type->IsList()) {
+ auto itemType = AS_TYPE(TListType, type)->GetItemType();
+ TDefaultListRepresentation items;
+ while (buf.Read() == '\0') {
+ items = items.Append(ReadSkiffNativeYtValue(itemType, nativeYtTypeFlags, holderFactory, buf));
+ }
+
+ return holderFactory.CreateDirectListHolder(std::move(items));
+ }
+
+ if (type->IsVariant()) {
+ auto varType = AS_TYPE(TVariantType, type);
+ ui16 data = 0;
+ if (varType->GetAlternativesCount() < 256) {
+ buf.ReadMany((char*)&data, 1);
+ } else {
+ buf.ReadMany((char*)&data, sizeof(data));
+ }
+ if (varType->GetUnderlyingType()->IsTuple()) {
+ auto tupleType = AS_TYPE(TTupleType, varType->GetUnderlyingType());
+ YQL_ENSURE(data < tupleType->GetElementsCount());
+ auto item = ReadSkiffNativeYtValue(tupleType->GetElementType(data), nativeYtTypeFlags, holderFactory, buf);
+ return holderFactory.CreateVariantHolder(item.Release(), data);
+ }
+ else {
+ auto structType = AS_TYPE(TStructType, varType->GetUnderlyingType());
+ if (auto cookie = structType->GetCookie()) {
+ const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie);
+ data = reorder[data];
+ }
+ YQL_ENSURE(data < structType->GetMembersCount());
+
+ auto item = ReadSkiffNativeYtValue(structType->GetMemberType(data), nativeYtTypeFlags, holderFactory, buf);
+ return holderFactory.CreateVariantHolder(item.Release(), data);
+ }
+ }
+
+ if (type->IsVoid()) {
+ return NUdf::TUnboxedValue::Zero();
+ }
+
+ if (type->IsNull()) {
+ return NUdf::TUnboxedValue();
+ }
+
+ if (type->IsEmptyList() || type->IsEmptyDict()) {
+ return holderFactory.GetEmptyContainerLazy();
+ }
+
+ if (type->IsDict()) {
+ auto dictType = AS_TYPE(TDictType, type);
+ auto keyType = dictType->GetKeyType();
+ auto payloadType = dictType->GetPayloadType();
+
+ auto builder = holderFactory.NewDict(dictType, NUdf::TDictFlags::EDictKind::Hashed);
+ while (buf.Read() == '\0') {
+ auto key = ReadSkiffNativeYtValue(keyType, nativeYtTypeFlags, holderFactory, buf);
+ auto payload = ReadSkiffNativeYtValue(payloadType, nativeYtTypeFlags, holderFactory, buf);
+ builder->Add(std::move(key), std::move(payload));
+ }
+
+ return builder->Build();
+ }
+
+ YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr());
+}
+
+NUdf::TUnboxedValue ReadSkiffData(TType* type, ui64 nativeYtTypeFlags, NCommon::TInputBuf& buf) {
+ auto schemeType = static_cast<TDataType*>(type)->GetSchemeType();
+ switch (schemeType) {
+ case NUdf::TDataType<bool>::Id: {
+ ui8 data;
+ buf.ReadMany((char*)&data, sizeof(data));
+ return NUdf::TUnboxedValuePod(data != 0);
+ }
+
+ case NUdf::TDataType<ui8>::Id: {
+ ui64 data;
+ buf.ReadMany((char*)&data, sizeof(data));
+ return NUdf::TUnboxedValuePod(ui8(data));
+ }
+
+ case NUdf::TDataType<i8>::Id: {
+ i64 data;
+ buf.ReadMany((char*)&data, sizeof(data));
+ return NUdf::TUnboxedValuePod(i8(data));
+ }
+
+ case NUdf::TDataType<NUdf::TDate>::Id:
+ case NUdf::TDataType<ui16>::Id: {
+ ui64 data;
+ buf.ReadMany((char*)&data, sizeof(data));
+ return NUdf::TUnboxedValuePod(ui16(data));
+ }
+
+ case NUdf::TDataType<i16>::Id: {
+ i64 data;
+ buf.ReadMany((char*)&data, sizeof(data));
+ return NUdf::TUnboxedValuePod(i16(data));
+ }
+
+ case NUdf::TDataType<NUdf::TDate32>::Id:
+ case NUdf::TDataType<i32>::Id: {
+ i64 data;
+ buf.ReadMany((char*)&data, sizeof(data));
+ return NUdf::TUnboxedValuePod(i32(data));
+ }
+
+ case NUdf::TDataType<NUdf::TDatetime>::Id:
+ case NUdf::TDataType<ui32>::Id: {
+ ui64 data;
+ buf.ReadMany((char*)&data, sizeof(data));
+ return NUdf::TUnboxedValuePod(ui32(data));
+ }
+
+ case NUdf::TDataType<NUdf::TInterval>::Id:
+ case NUdf::TDataType<NUdf::TInterval64>::Id:
+ case NUdf::TDataType<NUdf::TDatetime64>::Id:
+ case NUdf::TDataType<NUdf::TTimestamp64>::Id:
+ case NUdf::TDataType<i64>::Id: {
+ i64 data;
+ buf.ReadMany((char*)&data, sizeof(data));
+ return NUdf::TUnboxedValuePod(data);
+ }
+
+ case NUdf::TDataType<NUdf::TTimestamp>::Id:
+ case NUdf::TDataType<ui64>::Id: {
+ ui64 data;
+ buf.ReadMany((char*)&data, sizeof(data));
+ return NUdf::TUnboxedValuePod(data);
+ }
+
+ case NUdf::TDataType<float>::Id: {
+ double data;
+ buf.ReadMany((char*)&data, sizeof(data));
+ return NUdf::TUnboxedValuePod(float(data));
+ }
+
+ case NUdf::TDataType<double>::Id: {
+ double data;
+ buf.ReadMany((char*)&data, sizeof(data));
+ return NUdf::TUnboxedValuePod(data);
+ }
+
+ case NUdf::TDataType<NUdf::TUtf8>::Id:
+ case NUdf::TDataType<char*>::Id:
+ case NUdf::TDataType<NUdf::TJson>::Id:
+ case NUdf::TDataType<NUdf::TYson>::Id:
+ case NUdf::TDataType<NUdf::TDyNumber>::Id:
+ case NUdf::TDataType<NUdf::TUuid>::Id: {
+ ui32 size;
+ buf.ReadMany((char*)&size, sizeof(size));
+ CHECK_STRING_LENGTH_UNSIGNED(size);
+ auto str = NUdf::TUnboxedValue(MakeStringNotFilled(size));
+ buf.ReadMany(str.AsStringRef().Data(), size);
+ return str;
+ }
+
+ case NUdf::TDataType<NUdf::TDecimal>::Id: {
+ if (nativeYtTypeFlags & NTCF_DECIMAL) {
+ auto const params = static_cast<TDataDecimalType*>(type)->GetParams();
+ if (params.first < 10) {
+ i32 data;
+ buf.ReadMany((char*)&data, sizeof(data));
+ return NUdf::TUnboxedValuePod(NDecimal::FromYtDecimal(data));
+ } else if (params.first < 19) {
+ i64 data;
+ buf.ReadMany((char*)&data, sizeof(data));
+ return NUdf::TUnboxedValuePod(NDecimal::FromYtDecimal(data));
+ } else {
+ YQL_ENSURE(params.first < 36);
+ NDecimal::TInt128 data;
+ buf.ReadMany((char*)&data, sizeof(data));
+ return NUdf::TUnboxedValuePod(NDecimal::FromYtDecimal(data));
+ }
+ } else {
+ ui32 size;
+ buf.ReadMany(reinterpret_cast<char*>(&size), sizeof(size));
+ const auto maxSize = sizeof(NDecimal::TInt128);
+ YQL_ENSURE(size > 0U && size <= maxSize, "Bad decimal field size: " << size);
+ char data[maxSize];
+ buf.ReadMany(data, size);
+ const auto& v = NDecimal::Deserialize(data, size);
+ YQL_ENSURE(!NDecimal::IsError(v.first), "Bad decimal field data: " << data);
+ YQL_ENSURE(size == v.second, "Bad decimal field size: " << size);
+ return NUdf::TUnboxedValuePod(v.first);
+ }
+ }
+
+ case NUdf::TDataType<NUdf::TTzDate>::Id: {
+ ui32 size;
+ buf.ReadMany((char*)&size, sizeof(size));
+ CHECK_STRING_LENGTH_UNSIGNED(size);
+ auto& vec = buf.YsonBuffer();
+ vec.resize(size);
+ buf.ReadMany(vec.data(), size);
+ ui16 value;
+ ui16 tzId;
+ YQL_ENSURE(DeserializeTzDate(TStringBuf(vec.begin(), vec.end()), value, tzId));
+ auto data = NUdf::TUnboxedValuePod(value);
+ data.SetTimezoneId(tzId);
+ return data;
+ }
+
+ case NUdf::TDataType<NUdf::TTzDatetime>::Id: {
+ ui32 size;
+ buf.ReadMany((char*)&size, sizeof(size));
+ CHECK_STRING_LENGTH_UNSIGNED(size);
+ auto& vec = buf.YsonBuffer();
+ vec.resize(size);
+ buf.ReadMany(vec.data(), size);
+ ui32 value;
+ ui16 tzId;
+ YQL_ENSURE(DeserializeTzDatetime(TStringBuf(vec.begin(), vec.end()), value, tzId));
+ auto data = NUdf::TUnboxedValuePod(value);
+ data.SetTimezoneId(tzId);
+ return data;
+ }
+
+ case NUdf::TDataType<NUdf::TTzTimestamp>::Id: {
+ ui32 size;
+ buf.ReadMany((char*)&size, sizeof(size));
+ CHECK_STRING_LENGTH_UNSIGNED(size);
+ auto& vec = buf.YsonBuffer();
+ vec.resize(size);
+ buf.ReadMany(vec.data(), size);
+ ui64 value;
+ ui16 tzId;
+ YQL_ENSURE(DeserializeTzTimestamp(TStringBuf(vec.begin(), vec.end()), value, tzId));
+ auto data = NUdf::TUnboxedValuePod(value);
+ data.SetTimezoneId(tzId);
+ return data;
+ }
+
+ case NUdf::TDataType<NUdf::TJsonDocument>::Id: {
+ ui32 size;
+ buf.ReadMany((char*)&size, sizeof(size));
+ CHECK_STRING_LENGTH_UNSIGNED(size);
+ auto json = NUdf::TUnboxedValue(MakeStringNotFilled(size));
+ buf.ReadMany(json.AsStringRef().Data(), size);
+ return ValueFromString(EDataSlot::JsonDocument, json.AsStringRef());
+ }
+
+ default:
+ YQL_ENSURE(false, "Unsupported data type: " << schemeType);
+ }
+}
+
+void WriteSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) {
+ auto schemeType = static_cast<TDataType*>(type)->GetSchemeType();
+ switch (schemeType) {
+ case NUdf::TDataType<bool>::Id: {
+ ui8 data = value.Get<ui8>();
+ buf.Write(data);
+ break;
+ }
+
+ case NUdf::TDataType<ui8>::Id: {
+ ui64 data = value.Get<ui8>();
+ buf.WriteMany((const char*)&data, sizeof(data));
+ break;
+ }
+
+ case NUdf::TDataType<i8>::Id: {
+ i64 data = value.Get<i8>();
+ buf.WriteMany((const char*)&data, sizeof(data));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TDate>::Id:
+ case NUdf::TDataType<ui16>::Id: {
+ ui64 data = value.Get<ui16>();
+ buf.WriteMany((const char*)&data, sizeof(data));
+ break;
+ }
+
+ case NUdf::TDataType<i16>::Id: {
+ i64 data = value.Get<i16>();
+ buf.WriteMany((const char*)&data, sizeof(data));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TDate32>::Id:
+ case NUdf::TDataType<i32>::Id: {
+ i64 data = value.Get<i32>();
+ buf.WriteMany((const char*)&data, sizeof(data));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TDatetime>::Id:
+ case NUdf::TDataType<ui32>::Id: {
+ ui64 data = value.Get<ui32>();
+ buf.WriteMany((const char*)&data, sizeof(data));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TInterval>::Id:
+ case NUdf::TDataType<NUdf::TInterval64>::Id:
+ case NUdf::TDataType<NUdf::TDatetime64>::Id:
+ case NUdf::TDataType<NUdf::TTimestamp64>::Id:
+ case NUdf::TDataType<i64>::Id: {
+ i64 data = value.Get<i64>();
+ buf.WriteMany((const char*)&data, sizeof(data));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TTimestamp>::Id:
+ case NUdf::TDataType<ui64>::Id: {
+ ui64 data = value.Get<ui64>();
+ buf.WriteMany((const char*)&data, sizeof(data));
+ break;
+ }
+
+ case NUdf::TDataType<float>::Id: {
+ double data = value.Get<float>();
+ buf.WriteMany((const char*)&data, sizeof(data));
+ break;
+ }
+
+ case NUdf::TDataType<double>::Id: {
+ double data = value.Get<double>();
+ buf.WriteMany((const char*)&data, sizeof(data));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TUtf8>::Id:
+ case NUdf::TDataType<char*>::Id:
+ case NUdf::TDataType<NUdf::TJson>::Id:
+ case NUdf::TDataType<NUdf::TYson>::Id:
+ case NUdf::TDataType<NUdf::TDyNumber>::Id:
+ case NUdf::TDataType<NUdf::TUuid>::Id: {
+ auto str = value.AsStringRef();
+ ui32 size = str.Size();
+ buf.WriteMany((const char*)&size, sizeof(size));
+ buf.WriteMany(str);
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TDecimal>::Id: {
+ if (nativeYtTypeFlags & NTCF_DECIMAL) {
+ auto const params = static_cast<TDataDecimalType*>(type)->GetParams();
+ const NDecimal::TInt128 data128 = value.GetInt128();
+ if (params.first < 10) {
+ auto data = NDecimal::ToYtDecimal<i32>(data128);
+ buf.WriteMany((const char*)&data, sizeof(data));
+ } else if (params.first < 19) {
+ auto data = NDecimal::ToYtDecimal<i64>(data128);
+ buf.WriteMany((const char*)&data, sizeof(data));
+ } else {
+ YQL_ENSURE(params.first < 36);
+ auto data = NDecimal::ToYtDecimal<NDecimal::TInt128>(data128);
+ buf.WriteMany((const char*)&data, sizeof(data));
+ }
+ } else {
+ char data[sizeof(NDecimal::TInt128)];
+ const ui32 size = NDecimal::Serialize(value.GetInt128(), data);
+ buf.WriteMany(reinterpret_cast<const char*>(&size), sizeof(size));
+ buf.WriteMany(data, size);
+ }
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TTzDate>::Id: {
+ ui16 tzId = SwapBytes(value.GetTimezoneId());
+ ui16 data = SwapBytes(value.Get<ui16>());
+ ui32 size = sizeof(data) + sizeof(tzId);
+ buf.WriteMany((const char*)&size, sizeof(size));
+ buf.WriteMany((const char*)&data, sizeof(data));
+ buf.WriteMany((const char*)&tzId, sizeof(tzId));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TTzDatetime>::Id: {
+ ui16 tzId = SwapBytes(value.GetTimezoneId());
+ ui32 data = SwapBytes(value.Get<ui32>());
+ ui32 size = sizeof(data) + sizeof(tzId);
+ buf.WriteMany((const char*)&size, sizeof(size));
+ buf.WriteMany((const char*)&data, sizeof(data));
+ buf.WriteMany((const char*)&tzId, sizeof(tzId));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TTzTimestamp>::Id: {
+ ui16 tzId = SwapBytes(value.GetTimezoneId());
+ ui64 data = SwapBytes(value.Get<ui64>());
+ ui32 size = sizeof(data) + sizeof(tzId);
+ buf.WriteMany((const char*)&size, sizeof(size));
+ buf.WriteMany((const char*)&data, sizeof(data));
+ buf.WriteMany((const char*)&tzId, sizeof(tzId));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TTzDate32>::Id: {
+ ui16 tzId = SwapBytes(value.GetTimezoneId());
+ ui32 data = 0x80 ^ SwapBytes((ui32)value.Get<i32>());
+ ui32 size = sizeof(data) + sizeof(tzId);
+ buf.WriteMany((const char*)&size, sizeof(size));
+ buf.WriteMany((const char*)&data, sizeof(data));
+ buf.WriteMany((const char*)&tzId, sizeof(tzId));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TTzDatetime64>::Id: {
+ ui16 tzId = SwapBytes(value.GetTimezoneId());
+ ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>());
+ ui32 size = sizeof(data) + sizeof(tzId);
+ buf.WriteMany((const char*)&size, sizeof(size));
+ buf.WriteMany((const char*)&data, sizeof(data));
+ buf.WriteMany((const char*)&tzId, sizeof(tzId));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: {
+ ui16 tzId = SwapBytes(value.GetTimezoneId());
+ ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>());
+ ui32 size = sizeof(data) + sizeof(tzId);
+ buf.WriteMany((const char*)&size, sizeof(size));
+ buf.WriteMany((const char*)&data, sizeof(data));
+ buf.WriteMany((const char*)&tzId, sizeof(tzId));
+ break;
+ }
+
+ case NUdf::TDataType<NUdf::TJsonDocument>::Id: {
+ NUdf::TUnboxedValue json = ValueToString(EDataSlot::JsonDocument, value);
+ auto str = json.AsStringRef();
+ ui32 size = str.Size();
+ buf.WriteMany((const char*)&size, sizeof(size));
+ buf.WriteMany(str);
+ break;
+ }
+
+ default:
+ YQL_ENSURE(false, "Unsupported data type: " << schemeType);
+ }
+}
+
+void WriteSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) {
+ if (type->IsData()) {
+ WriteSkiffData(type, nativeYtTypeFlags, value, buf);
+ } else if (type->IsPg()) {
+ WriteSkiffPgValue(static_cast<TPgType*>(type), value, buf);
+ } else if (type->IsOptional()) {
+ if (!value) {
+ buf.Write('\0');
+ return;
+ }
+
+ buf.Write('\1');
+ WriteSkiffNativeYtValue(AS_TYPE(TOptionalType, type)->GetItemType(), nativeYtTypeFlags, value.GetOptionalValue(), buf);
+ } else if (type->IsList()) {
+ auto itemType = AS_TYPE(TListType, type)->GetItemType();
+ auto elements = value.GetElements();
+ if (elements) {
+ ui32 size = value.GetListLength();
+ for (ui32 i = 0; i < size; ++i) {
+ buf.Write('\0');
+ WriteSkiffNativeYtValue(itemType, nativeYtTypeFlags, elements[i], buf);
+ }
+ } else {
+ NUdf::TUnboxedValue item;
+ for (auto iter = value.GetListIterator(); iter.Next(item); ) {
+ buf.Write('\0');
+ WriteSkiffNativeYtValue(itemType, nativeYtTypeFlags, item, buf);
+ }
+ }
+
+ buf.Write('\xff');
+ } else if (type->IsTuple()) {
+ auto tupleType = AS_TYPE(TTupleType, type);
+ auto elements = value.GetElements();
+ if (elements) {
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ WriteSkiffNativeYtValue(tupleType->GetElementType(i), nativeYtTypeFlags, elements[i], buf);
+ }
+ } else {
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ WriteSkiffNativeYtValue(tupleType->GetElementType(i), nativeYtTypeFlags, value.GetElement(i), buf);
+ }
+ }
+ } else if (type->IsStruct()) {
+ auto structType = AS_TYPE(TStructType, type);
+ auto elements = value.GetElements();
+ if (auto cookie = type->GetCookie()) {
+ const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie);
+ if (elements) {
+ for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
+ const auto ndx = reorder[i];
+ WriteSkiffNativeYtValue(structType->GetMemberType(ndx), nativeYtTypeFlags, elements[ndx], buf);
+ }
+ } else {
+ for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
+ const auto ndx = reorder[i];
+ WriteSkiffNativeYtValue(structType->GetMemberType(ndx), nativeYtTypeFlags, value.GetElement(ndx), buf);
+ }
+ }
+ } else {
+ if (elements) {
+ for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
+ WriteSkiffNativeYtValue(structType->GetMemberType(i), nativeYtTypeFlags, elements[i], buf);
+ }
+ } else {
+ for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
+ WriteSkiffNativeYtValue(structType->GetMemberType(i), nativeYtTypeFlags, value.GetElement(i), buf);
+ }
+ }
+ }
+ } else if (type->IsVariant()) {
+ auto varType = AS_TYPE(TVariantType, type);
+ ui16 index = (ui16)value.GetVariantIndex();
+ if (varType->GetAlternativesCount() < 256) {
+ buf.WriteMany((const char*)&index, 1);
+ } else {
+ buf.WriteMany((const char*)&index, sizeof(index));
+ }
+
+ if (varType->GetUnderlyingType()->IsTuple()) {
+ auto tupleType = AS_TYPE(TTupleType, varType->GetUnderlyingType());
+ WriteSkiffNativeYtValue(tupleType->GetElementType(index), nativeYtTypeFlags, value.GetVariantItem(), buf);
+ } else {
+ auto structType = AS_TYPE(TStructType, varType->GetUnderlyingType());
+ if (auto cookie = structType->GetCookie()) {
+ const std::vector<size_t>& reorder = *((const std::vector<size_t>*)cookie);
+ index = reorder[index];
+ }
+ YQL_ENSURE(index < structType->GetMembersCount());
+
+ WriteSkiffNativeYtValue(structType->GetMemberType(index), nativeYtTypeFlags, value.GetVariantItem(), buf);
+ }
+ } else if (type->IsVoid() || type->IsNull() || type->IsEmptyList() || type->IsEmptyDict()) {
+ } else if (type->IsDict()) {
+ auto dictType = AS_TYPE(TDictType, type);
+ auto keyType = dictType->GetKeyType();
+ auto payloadType = dictType->GetPayloadType();
+ NUdf::TUnboxedValue key, payload;
+ for (auto iter = value.GetDictIterator(); iter.NextPair(key, payload); ) {
+ buf.Write('\0');
+ WriteSkiffNativeYtValue(keyType, nativeYtTypeFlags, key, buf);
+ WriteSkiffNativeYtValue(payloadType, nativeYtTypeFlags, payload, buf);
+ }
+
+ buf.Write('\xff');
+ } else {
+ YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr());
+ }
+}
+
+extern "C" void WriteContainerNativeYtValue(TType* type, ui64 nativeYtTypeFlags, const NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) {
+ WriteSkiffNativeYtValue(type, nativeYtTypeFlags, value, buf);
+}
+
} // NYql
diff --git a/yt/yql/providers/yt/codec/yt_codec.h b/yt/yql/providers/yt/codec/yt_codec.h
index c7bae246ac..f7ea0e0b78 100644
--- a/yt/yql/providers/yt/codec/yt_codec.h
+++ b/yt/yql/providers/yt/codec/yt_codec.h
@@ -257,4 +257,30 @@ public:
using IMkqlWriterImplPtr = TIntrusivePtr<IMkqlWriterImpl>;
+NKikimr::NUdf::TUnboxedValue ReadYsonValueInTableFormat(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory, char cmd, NCommon::TInputBuf& buf);
+TMaybe<NKikimr::NUdf::TUnboxedValue> ParseYsonValueInTableFormat(const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ const TStringBuf& yson, NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, IOutputStream* err);
+TMaybe<NKikimr::NUdf::TUnboxedValue> ParseYsonNode(const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ const NYT::TNode& node, NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, IOutputStream* err);
+extern "C" void ReadYsonContainerValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory, NKikimr::NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf,
+ bool wrapOptional);
+extern "C" void ReadContainerNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory, NKikimr::NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf,
+ bool wrapOptional);
+void WriteYsonValueInTableFormat(NCommon::TOutputBuf& buf, NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, bool topLevel);
+extern "C" void WriteYsonContainerValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags,
+ const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf);
+
+void SkipSkiffField(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, NCommon::TInputBuf& buf);
+NKikimr::NUdf::TUnboxedValue ReadSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory, NCommon::TInputBuf& buf);
+NKikimr::NUdf::TUnboxedValue ReadSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, NCommon::TInputBuf& buf);
+void WriteSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf);
+void WriteSkiffNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags,
+ const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf);
+extern "C" void WriteContainerNativeYtValue(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags,
+ const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf);
+
} // NYql
diff --git a/yt/yql/providers/yt/codec/yt_codec_io.cpp b/yt/yql/providers/yt/codec/yt_codec_io.cpp
index aa47453ea2..82b1a3e2f8 100644
--- a/yt/yql/providers/yt/codec/yt_codec_io.cpp
+++ b/yt/yql/providers/yt/codec/yt_codec_io.cpp
@@ -901,12 +901,12 @@ protected:
return NUdf::TUnboxedValue();
}
auto& decoder = *SpecsCache_.GetSpecs().Inputs[TableIndex_];
- auto val = ReadYsonValue((decoder.NativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) ? type : uwrappedType, decoder.NativeYtTypeFlags, SpecsCache_.GetHolderFactory(), cmd, Buf_, true);
+ auto val = ReadYsonValueInTableFormat((decoder.NativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) ? type : uwrappedType, decoder.NativeYtTypeFlags, SpecsCache_.GetHolderFactory(), cmd, Buf_);
return (decoder.NativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) ? val : val.Release().MakeOptional();
} else {
if (Y_LIKELY(cmd != EntitySymbol)) {
auto& decoder = *SpecsCache_.GetSpecs().Inputs[TableIndex_];
- return ReadYsonValue(type, decoder.NativeYtTypeFlags, SpecsCache_.GetHolderFactory(), cmd, Buf_, true);
+ return ReadYsonValueInTableFormat(type, decoder.NativeYtTypeFlags, SpecsCache_.GetHolderFactory(), cmd, Buf_);
}
if (type->GetKind() == TType::EKind::Data && static_cast<TDataType*>(type)->GetSchemeType() == NUdf::TDataType<NUdf::TYson>::Id) {
@@ -1367,7 +1367,7 @@ protected:
}
if (uwrappedType->IsData()) {
- return NCommon::ReadSkiffData(uwrappedType, 0, Buf_);
+ return ReadSkiffData(uwrappedType, 0, Buf_);
} else if (!isOptional && uwrappedType->IsPg()) {
return NCommon::ReadSkiffPg(static_cast<TPgType*>(uwrappedType), Buf_);
} else {
@@ -1378,17 +1378,17 @@ protected:
// parse binary yson...
YQL_ENSURE(size > 0);
char cmd = Buf_.Read();
- auto value = ReadYsonValue(uwrappedType, 0, SpecsCache_.GetHolderFactory(), cmd, Buf_, true);
+ auto value = ReadYsonValueInTableFormat(uwrappedType, 0, SpecsCache_.GetHolderFactory(), cmd, Buf_);
return isOptional ? value.Release().MakeOptional() : value;
}
}
NUdf::TUnboxedValue ReadSkiffFieldNativeYt(TType* type, ui64 nativeYtTypeFlags) {
- return NCommon::ReadSkiffNativeYtValue(type, nativeYtTypeFlags, SpecsCache_.GetHolderFactory(), Buf_);
+ return ReadSkiffNativeYtValue(type, nativeYtTypeFlags, SpecsCache_.GetHolderFactory(), Buf_);
}
void SkipSkiffField(TType* type, ui64 nativeYtTypeFlags) {
- return NCommon::SkipSkiffField(type, nativeYtTypeFlags, Buf_);
+ return ::NYql::SkipSkiffField(type, nativeYtTypeFlags, Buf_);
}
};
@@ -2065,9 +2065,9 @@ protected:
void WriteSkiffValue(TType* type, const NUdf::TUnboxedValuePod& value, bool wasOptional) {
if (NativeYtTypeFlags_) {
- NCommon::WriteSkiffNativeYtValue(type, NativeYtTypeFlags_, value, Buf_);
+ WriteSkiffNativeYtValue(type, NativeYtTypeFlags_, value, Buf_);
} else if (type->IsData()) {
- NCommon::WriteSkiffData(type, 0, value, Buf_);
+ WriteSkiffData(type, 0, value, Buf_);
} else if (!wasOptional && type->IsPg()) {
NCommon::WriteSkiffPg(static_cast<TPgType*>(type), value, Buf_);
} else {
diff --git a/yt/yql/providers/yt/lib/res_pull/res_or_pull.cpp b/yt/yql/providers/yt/lib/res_pull/res_or_pull.cpp
index d7800dc2d7..c901181ff5 100644
--- a/yt/yql/providers/yt/lib/res_pull/res_or_pull.cpp
+++ b/yt/yql/providers/yt/lib/res_pull/res_or_pull.cpp
@@ -197,7 +197,7 @@ bool TSkiffExecuteResOrPull::WriteNext(TMkqlIOCache& specsCache, const NYT::TNod
YQL_ENSURE(rec.GetType() == NYT::TNode::EType::Map, "Expected map node");
TStringStream err;
- auto value = NCommon::ParseYsonNode(specsCache.GetHolderFactory(), rec, Specs.Outputs[0].RowType, Specs.Outputs[0].NativeYtTypeFlags, &err);
+ auto value = ParseYsonNode(specsCache.GetHolderFactory(), rec, Specs.Outputs[0].RowType, Specs.Outputs[0].NativeYtTypeFlags, &err);
if (!value) {
throw yexception() << "Could not parse yson node with error: " << err.Str();
}