summaryrefslogtreecommitdiffstats
path: root/yql/essentials/udfs/logs/dsv/dsv_udf.cpp
diff options
context:
space:
mode:
authorvvvv <[email protected]>2025-10-06 13:26:25 +0300
committervvvv <[email protected]>2025-10-06 14:06:25 +0300
commiteca8ce9cb1613d5c983185c4e43c20651a9638aa (patch)
tree61ee5ae779948e61af9a7691d19eaa2c09869121 /yql/essentials/udfs/logs/dsv/dsv_udf.cpp
parent4adf7eecae16a9b228b28cc5f64c27ef69ad5ec2 (diff)
YQL-20086 udfs
init commit_hash:f9684778bf1ea956965f2360b80b91edb7d4ffbe
Diffstat (limited to 'yql/essentials/udfs/logs/dsv/dsv_udf.cpp')
-rw-r--r--yql/essentials/udfs/logs/dsv/dsv_udf.cpp123
1 files changed, 56 insertions, 67 deletions
diff --git a/yql/essentials/udfs/logs/dsv/dsv_udf.cpp b/yql/essentials/udfs/logs/dsv/dsv_udf.cpp
index 421e36a0100..8d66253a32b 100644
--- a/yql/essentials/udfs/logs/dsv/dsv_udf.cpp
+++ b/yql/essentials/udfs/logs/dsv/dsv_udf.cpp
@@ -11,15 +11,13 @@ using namespace NUdf;
namespace {
-struct TKsvIndexes
-{
+struct TKsvIndexes {
ui32 Key;
ui32 Subkey;
ui32 Value;
};
-struct TResultIndexes
-{
+struct TResultIndexes {
TType* DictType;
ui32 Key;
@@ -40,28 +38,26 @@ void ParseDsv(const TUnboxedValuePod& value,
const auto from = std::distance(input.begin(), part.begin());
builder->Add(
valueBuilder->SubString(value, from, pos),
- valueBuilder->SubString(value, from + pos + 1U, part.length() - pos - 1U)
- );
+ valueBuilder->SubString(value, from + pos + 1U, part.length() - pos - 1U));
}
}
}
-class TDsvReadRecord: public TBoxedValue
-{
+class TDsvReadRecord: public TBoxedValue {
public:
- class TFactory : public TBoxedValue {
+ class TFactory: public TBoxedValue {
public:
TFactory(const TResultIndexes& fieldIndexes,
const TKsvIndexes& ksvIndexes)
- : ResultIndexes_(fieldIndexes)
- , KsvIndexes_(ksvIndexes)
+ : ResultIndexes_(fieldIndexes)
+ , KsvIndexes_(ksvIndexes)
{
}
+
private:
TUnboxedValue Run(
const IValueBuilder* valueBuilder,
- const TUnboxedValuePod* args) const final try
- {
+ const TUnboxedValuePod* args) const final try {
const auto optRunConfig = args[0];
TUnboxedValue separator;
if (optRunConfig && !optRunConfig.AsStringRef().Empty()) {
@@ -71,8 +67,7 @@ public:
}
return TUnboxedValuePod(new TDsvReadRecord(separator, ResultIndexes_, KsvIndexes_));
- }
- catch (const std::exception& e) {
+ } catch (const std::exception& e) {
UdfTerminate(e.what());
}
@@ -88,16 +83,15 @@ public:
, KsvIndexes_(ksvIndexes)
{
}
+
private:
TUnboxedValue Run(
- const IValueBuilder* valueBuilder,
- const TUnboxedValuePod* args) const final try
- {
+ const IValueBuilder* valueBuilder,
+ const TUnboxedValuePod* args) const final try {
auto keyData = args[0].GetElement(KsvIndexes_.Key);
auto subkeyData = args[0].GetElement(KsvIndexes_.Subkey);
auto valueData = args[0].GetElement(KsvIndexes_.Value);
-
auto dict = valueBuilder->NewDict(ResultIndexes_.DictType, 0);
ParseDsv(valueData, Separator_.AsStringRef(), valueBuilder, dict.Get());
@@ -108,8 +102,7 @@ private:
items[ResultIndexes_.Subkey] = subkeyData;
items[ResultIndexes_.Dict] = dict->Build();
return result;
- }
- catch (const std::exception& e) {
+ } catch (const std::exception& e) {
UdfTerminate(e.what());
}
@@ -118,48 +111,46 @@ private:
const TKsvIndexes KsvIndexes_;
};
-class TDsvParse: public TBoxedValue
-{
+class TDsvParse: public TBoxedValue {
public:
explicit TDsvParse(TType* dictType)
: DictType_(dictType)
- {}
+ {
+ }
+
private:
TUnboxedValue Run(
- const IValueBuilder* valueBuilder,
- const TUnboxedValuePod* args) const final try
- {
- const std::string_view separator = args[1] ?
- std::string_view(args[1].AsStringRef()):
- std::string_view("\t");
+ const IValueBuilder* valueBuilder,
+ const TUnboxedValuePod* args) const final try {
+ const std::string_view separator = args[1] ? std::string_view(args[1].AsStringRef()) : std::string_view("\t");
auto dict = valueBuilder->NewDict(DictType_, 0);
ParseDsv(args[0], separator, valueBuilder, dict.Get());
return dict->Build();
- }
- catch (const std::exception& e) {
+ } catch (const std::exception& e) {
UdfTerminate(e.what());
}
const TType* DictType_;
};
-#define TYPE_TO_STRING(type) \
-case TDataType<type>::Id: part += ToString(member.Get<type>()); break;
+#define TYPE_TO_STRING(type) \
+ case TDataType<type>::Id: \
+ part += ToString(member.Get<type>()); \
+ break;
-class TDsvSerialize: public TBoxedValue
-{
+class TDsvSerialize: public TBoxedValue {
public:
explicit TDsvSerialize(const TVector<TDataTypeId>& typeIds, TStructTypeInspector* structInspector)
: TypeIds_(typeIds)
, StructInspector_(structInspector)
- {}
+ {
+ }
private:
TUnboxedValue Run(
- const IValueBuilder* valueBuilder,
- const TUnboxedValuePod* args) const final try
- {
+ const IValueBuilder* valueBuilder,
+ const TUnboxedValuePod* args) const final try {
TVector<TString> result;
if (const ui32 structSize = StructInspector_->GetMembersCount()) {
result.reserve(structSize);
@@ -179,14 +170,12 @@ private:
default:
part += member.AsStringRef();
break;
-
}
result.emplace_back(std::move(part));
}
}
return valueBuilder->NewString(JoinStrings(result, "\t"));
- }
- catch (const std::exception& e) {
+ } catch (const std::exception& e) {
UdfTerminate(e.what());
}
@@ -194,14 +183,14 @@ private:
THolder<TStructTypeInspector> StructInspector_;
};
-class TDsvModule: public IUdfModule
-{
+class TDsvModule: public IUdfModule {
public:
TStringRef Name() const {
return TStringRef::Of("Dsv");
}
- void CleanupOnTerminate() const final {}
+ void CleanupOnTerminate() const final {
+ }
void GetAllFunctions(IFunctionsSink& sink) const final {
sink.Add(TStringRef::Of("ReadRecord"));
@@ -210,39 +199,36 @@ public:
}
void BuildFunctionTypeInfo(
- const TStringRef& name,
- TType* userType,
- const TStringRef& typeConfig,
- ui32 flags,
- IFunctionTypeInfoBuilder& builder) const final try
- {
+ const TStringRef& name,
+ TType* userType,
+ const TStringRef& typeConfig,
+ ui32 flags,
+ IFunctionTypeInfoBuilder& builder) const final try {
Y_UNUSED(typeConfig);
bool typesOnly = (flags & TFlags::TypesOnly);
if (TStringRef::Of("ReadRecord") == name) {
TKsvIndexes ksvIndexes;
- auto recordType = builder.Struct(3U)->
- AddField<char*>("key", &ksvIndexes.Key)
- .AddField<char*>("subkey", &ksvIndexes.Subkey)
- .AddField<char*>("value", &ksvIndexes.Value)
- .Build();
+ auto recordType = builder.Struct(3U)->AddField<char*>("key", &ksvIndexes.Key).AddField<char*>("subkey", &ksvIndexes.Subkey).AddField<char*>("value", &ksvIndexes.Value).Build();
TResultIndexes resultIndexes;
resultIndexes.DictType = builder.Dict()->Key<char*>().Value<char*>().Build();
const auto structType = builder.Struct(resultIndexes.FieldsCount)
- ->AddField<char*>("key", &resultIndexes.Key)
- .AddField<char*>("subkey", &resultIndexes.Subkey)
- .AddField("dict", resultIndexes.DictType, &resultIndexes.Dict)
- .Build();
+ ->AddField<char*>("key", &resultIndexes.Key)
+ .AddField<char*>("subkey", &resultIndexes.Subkey)
+ .AddField("dict", resultIndexes.DictType, &resultIndexes.Dict)
+ .Build();
builder.Returns(structType)
- .Args()->Add(recordType).Done()
- .RunConfig<TOptional<char*>>();
+ .Args()
+ ->Add(recordType)
+ .Done()
+ .RunConfig<TOptional<char*>>();
if (!typesOnly) {
builder.Implementation(new TDsvReadRecord::TFactory(
- resultIndexes, ksvIndexes));
+ resultIndexes, ksvIndexes));
}
builder.IsStrict();
} else if (TStringRef::Of("Parse") == name) {
@@ -250,8 +236,12 @@ public:
auto dictType = builder.Dict()->Key<char*>().Value<char*>().Build();
builder.Returns(dictType)
- .Args()->Add<char*>().Flags(ICallablePayload::TArgumentFlags::AutoMap).Add(optionalStringType).Done()
- .OptionalArgs(1);
+ .Args()
+ ->Add<char*>()
+ .Flags(ICallablePayload::TArgumentFlags::AutoMap)
+ .Add(optionalStringType)
+ .Done()
+ .OptionalArgs(1);
if (!typesOnly) {
builder.Implementation(new TDsvParse(dictType));
@@ -304,7 +294,6 @@ public:
builder.Implementation(new TDsvSerialize(typeIds, structInspector.Release()));
}
builder.IsStrict();
-
}
} catch (const std::exception& e) {
builder.SetError(CurrentExceptionMessage());