1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
#include <yql/essentials/minikql/protobuf_udf/type_builder.h>
#include <yql/essentials/minikql/protobuf_udf/value_builder.h>
#include <yql/essentials/public/udf/udf_value.h>
#include <yql/essentials/public/udf/udf_registrator.h>
#include <library/cpp/protobuf/yql/descriptor.h>
using namespace NKikimr::NUdf;
using namespace NProtoBuf;
namespace {
class TDynamicProtoValue: public TProtobufValue {
public:
TDynamicProtoValue(const TProtoInfo& info, TDynamicInfoRef dyn)
: TProtobufValue(info)
, Dynamic_(dyn)
{
Y_ASSERT(Dynamic_ != nullptr);
}
TAutoPtr<Message> Parse(const TStringBuf& data) const override {
return Dynamic_->Parse(data);
}
private:
TDynamicInfoRef Dynamic_;
};
class TDynamicProtoSerialize: public TProtobufSerialize {
public:
TDynamicProtoSerialize(const TProtoInfo& info, TDynamicInfoRef dyn)
: TProtobufSerialize(info)
, Dynamic_(dyn)
{
Y_ASSERT(Dynamic_ != nullptr);
}
TMaybe<TString> Serialize(const Message& proto) const override {
return Dynamic_->Serialize(proto);
}
TAutoPtr<Message> MakeProto() const override {
return Dynamic_->MakeProto();
}
private:
TDynamicInfoRef Dynamic_;
};
class TDynamicProtoValueSafe: public TDynamicProtoValue {
public:
TDynamicProtoValueSafe(const TProtoInfo& info, TDynamicInfoRef dyn)
: TDynamicProtoValue(info, dyn) {}
TAutoPtr<Message> Parse(const TStringBuf& data) const override {
try {
return TDynamicProtoValue::Parse(data);
} catch (const std::exception& e) {
return nullptr;
}
}
};
class TProtobufModule: public IUdfModule {
public:
TStringRef Name() const {
return TStringRef("Protobuf");
}
void CleanupOnTerminate() const final {
}
void GetAllFunctions(IFunctionsSink& sink) const final {
sink.Add(TStringRef::Of("Parse"))->SetTypeAwareness();
sink.Add(TStringRef::Of("TryParse"))->SetTypeAwareness();
sink.Add(TStringRef::Of("Serialize"))->SetTypeAwareness();
}
void BuildFunctionTypeInfo(
const TStringRef& name,
TType* userType,
const TStringRef& typeConfig,
ui32 flags,
IFunctionTypeInfoBuilder& builder) const final {
Y_UNUSED(userType);
try {
auto dyn = TDynamicInfo::Create(TStringBuf(typeConfig.Data(), typeConfig.Size()));
TProtoInfo typeInfo;
ProtoTypeBuild(dyn->Descriptor(),
dyn->GetEnumFormat(),
dyn->GetRecursionTraits(),
dyn->GetOptionalLists(),
builder, &typeInfo,
EProtoStringYqlType::Bytes,
dyn->GetSyntaxAware(),
false,
dyn->GetYtMode());
auto stringType = builder.SimpleType<char*>();
auto structType = typeInfo.StructType;
auto optionalStructType = builder.Optional()->Item(structType).Build();
if (TStringRef::Of("Serialize") == name) {
// function signature:
// String Serialize(Protobuf value)
builder.Returns(stringType)
.Args()
->Add(structType)
.Flags(ICallablePayload::TArgumentFlags::AutoMap)
.Done();
if ((flags & TFlags::TypesOnly) == 0) {
builder.Implementation(new TDynamicProtoSerialize(typeInfo, dyn));
}
} else {
// function signature:
// Protobuf Parse(String value)
builder.Returns((TStringRef::Of("TryParse") == name) ? optionalStructType : structType)
.Args()
->Add(stringType)
.Flags(ICallablePayload::TArgumentFlags::AutoMap)
.Done();
if (TStringRef::Of("Parse") == name) {
if ((flags & TFlags::TypesOnly) == 0) {
builder.Implementation(new TDynamicProtoValue(typeInfo, dyn));
}
} else if (TStringRef::Of("TryParse") == name) {
if ((flags & TFlags::TypesOnly) == 0) {
builder.Implementation(new TDynamicProtoValueSafe(typeInfo, dyn));
}
}
}
} catch (const std::exception& e) {
builder.SetError(CurrentExceptionMessage());
}
}
};
}
REGISTER_MODULES(TProtobufModule);
|