aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorimunkin <imunkin@yandex-team.com>2024-11-05 17:12:50 +0300
committerimunkin <imunkin@yandex-team.com>2024-11-05 17:30:29 +0300
commitd757e0df81792a1c29f9bd22e7775a79cbeaeadf (patch)
treee0a6e3f5c71fd0ebd28f83bf16f4e6d9eda1a217
parent8a336ec2bfffc9c41099e9f31913d6c943362a24 (diff)
downloadydb-d757e0df81792a1c29f9bd22e7775a79cbeaeadf.tar.gz
Move yql/udfs/logs/ to /yql/essentials YQL-19206
commit_hash:7137f6ca04c4afa838eea547ba246547f4442724
-rw-r--r--yql/essentials/udfs/logs/dsv/dsv_udf.cpp316
-rw-r--r--yql/essentials/udfs/logs/dsv/ya.make17
-rw-r--r--yql/essentials/udfs/logs/ya.make3
-rw-r--r--yql/essentials/udfs/ya.make1
4 files changed, 337 insertions, 0 deletions
diff --git a/yql/essentials/udfs/logs/dsv/dsv_udf.cpp b/yql/essentials/udfs/logs/dsv/dsv_udf.cpp
new file mode 100644
index 00000000000..633e5f49ddb
--- /dev/null
+++ b/yql/essentials/udfs/logs/dsv/dsv_udf.cpp
@@ -0,0 +1,316 @@
+#include <yql/essentials/public/udf/udf_helpers.h>
+#include <yql/essentials/public/udf/udf_value_builder.h>
+#include <yql/essentials/public/udf/udf_type_inspection.h>
+
+#include <util/generic/yexception.h>
+#include <library/cpp/deprecated/split/split_iterator.h>
+#include <util/string/vector.h>
+
+using namespace NKikimr;
+using namespace NUdf;
+
+namespace {
+
+struct TKsvIndexes
+{
+ ui32 key;
+ ui32 subkey;
+ ui32 value;
+};
+
+struct TResultIndexes
+{
+ TType* DictType;
+
+ ui32 key;
+ ui32 subkey;
+ ui32 dict;
+ static constexpr ui32 FieldsCount = 3U;
+};
+
+void ParseDsv(const TUnboxedValuePod& value,
+ const std::string_view& separator,
+ const IValueBuilder* valueBuilder,
+ IDictValueBuilder* builder) {
+ const std::string_view input(value.AsStringRef());
+ const std::vector<std::string_view> parts = StringSplitter(input).SplitByString(separator);
+ for (const auto& part : parts) {
+ const auto pos = part.find('=');
+ if (std::string_view::npos != pos) {
+ const auto from = std::distance(input.begin(), part.begin());
+ builder->Add(
+ valueBuilder->SubString(value, from, pos),
+ valueBuilder->SubString(value, from + pos + 1U, part.length() - pos - 1U)
+ );
+ }
+ }
+}
+
+class TDsvReadRecord: public TBoxedValue
+{
+public:
+ class TFactory : public TBoxedValue {
+ public:
+ TFactory(const TResultIndexes& fieldIndexes,
+ const TKsvIndexes& ksvIndexes)
+ : ResultIndexes_(fieldIndexes)
+ , KsvIndexes_(ksvIndexes)
+ {
+ }
+ private:
+ TUnboxedValue Run(
+ const IValueBuilder* valueBuilder,
+ const TUnboxedValuePod* args) const final try
+ {
+ const auto optRunConfig = args[0];
+ TUnboxedValue separator;
+ if (optRunConfig && !optRunConfig.AsStringRef().Empty()) {
+ separator = optRunConfig;
+ } else {
+ separator = valueBuilder->NewString("\t");
+ }
+
+ return TUnboxedValuePod(new TDsvReadRecord(separator, ResultIndexes_, KsvIndexes_));
+ }
+ catch (const std::exception& e) {
+ UdfTerminate(e.what());
+ }
+
+ const TResultIndexes ResultIndexes_;
+ const TKsvIndexes KsvIndexes_;
+ };
+
+ explicit TDsvReadRecord(const TUnboxedValue& separator,
+ const TResultIndexes& fieldIndexes,
+ const TKsvIndexes& ksvIndexes)
+ : Separator_(std::move(separator))
+ , ResultIndexes_(fieldIndexes)
+ , KsvIndexes_(ksvIndexes)
+ {
+ }
+private:
+ TUnboxedValue Run(
+ const IValueBuilder* valueBuilder,
+ const TUnboxedValuePod* args) const final try
+ {
+ auto keyData = args[0].GetElement(KsvIndexes_.key);
+ auto subkeyData = args[0].GetElement(KsvIndexes_.subkey);
+ auto valueData = args[0].GetElement(KsvIndexes_.value);
+
+
+ auto dict = valueBuilder->NewDict(ResultIndexes_.DictType, 0);
+
+ ParseDsv(valueData, Separator_.AsStringRef(), valueBuilder, dict.Get());
+
+ TUnboxedValue* items = nullptr;
+ const auto result = valueBuilder->NewArray(ResultIndexes_.FieldsCount, items);
+ items[ResultIndexes_.key] = keyData;
+ items[ResultIndexes_.subkey] = subkeyData;
+ items[ResultIndexes_.dict] = dict->Build();
+ return result;
+ }
+ catch (const std::exception& e) {
+ UdfTerminate(e.what());
+ }
+
+ const TUnboxedValue Separator_;
+ const TResultIndexes ResultIndexes_;
+ const TKsvIndexes KsvIndexes_;
+};
+
+class TDsvParse: public TBoxedValue
+{
+public:
+ explicit TDsvParse(TType* dictType)
+ : DictType(dictType)
+ {}
+private:
+ TUnboxedValue Run(
+ const IValueBuilder* valueBuilder,
+ const TUnboxedValuePod* args) const final try
+ {
+ const std::string_view separator = args[1] ?
+ std::string_view(args[1].AsStringRef()):
+ std::string_view("\t");
+
+ auto dict = valueBuilder->NewDict(DictType, 0);
+ ParseDsv(args[0], separator, valueBuilder, dict.Get());
+ return dict->Build();
+ }
+ catch (const std::exception& e) {
+ UdfTerminate(e.what());
+ }
+
+ const TType* DictType;
+};
+
+#define TYPE_TO_STRING(type) \
+case TDataType<type>::Id: part += ToString(member.Get<type>()); break;
+
+class TDsvSerialize: public TBoxedValue
+{
+public:
+ explicit TDsvSerialize(const TVector<TDataTypeId>& typeIds, TStructTypeInspector* structInspector)
+ : TypeIds(typeIds)
+ , StructInspector(structInspector)
+ {}
+
+private:
+ TUnboxedValue Run(
+ const IValueBuilder* valueBuilder,
+ const TUnboxedValuePod* args) const final try
+ {
+ TVector<TString> result;
+ if (const ui32 structSize = StructInspector->GetMembersCount()) {
+ result.reserve(structSize);
+ for (ui32 i = 0; i < structSize; ++i) {
+ auto part = TString(StructInspector->GetMemberName(i));
+ part += '=';
+ const TUnboxedValue& member = args[0].GetElement(i);
+ switch (TypeIds[i]) {
+ TYPE_TO_STRING(i32)
+ TYPE_TO_STRING(ui32)
+ TYPE_TO_STRING(i64)
+ TYPE_TO_STRING(ui64)
+ TYPE_TO_STRING(ui8)
+ TYPE_TO_STRING(bool)
+ TYPE_TO_STRING(double)
+ TYPE_TO_STRING(float)
+ default:
+ part += member.AsStringRef();
+ break;
+
+ }
+ result.emplace_back(std::move(part));
+ }
+ }
+ return valueBuilder->NewString(JoinStrings(result, "\t"));
+ }
+ catch (const std::exception& e) {
+ UdfTerminate(e.what());
+ }
+
+ const TVector<TDataTypeId> TypeIds;
+ THolder<TStructTypeInspector> StructInspector;
+};
+
+class TDsvModule: public IUdfModule
+{
+public:
+ TStringRef Name() const {
+ return TStringRef::Of("Dsv");
+ }
+
+ void CleanupOnTerminate() const final {}
+
+ void GetAllFunctions(IFunctionsSink& sink) const final {
+ sink.Add(TStringRef::Of("ReadRecord"));
+ sink.Add(TStringRef::Of("Parse"));
+ sink.Add(TStringRef::Of("Serialize"))->SetTypeAwareness();
+ }
+
+ void BuildFunctionTypeInfo(
+ const TStringRef& name,
+ TType* userType,
+ const TStringRef& typeConfig,
+ ui32 flags,
+ IFunctionTypeInfoBuilder& builder) const final try
+ {
+ Y_UNUSED(typeConfig);
+
+ bool typesOnly = (flags & TFlags::TypesOnly);
+
+ if (TStringRef::Of("ReadRecord") == name) {
+ TKsvIndexes ksvIndexes;
+ auto recordType = builder.Struct(3U)->
+ AddField<char*>("key", &ksvIndexes.key)
+ .AddField<char*>("subkey", &ksvIndexes.subkey)
+ .AddField<char*>("value", &ksvIndexes.value)
+ .Build();
+
+ TResultIndexes resultIndexes;
+ resultIndexes.DictType = builder.Dict()->Key<char*>().Value<char*>().Build();
+ const auto structType = builder.Struct(resultIndexes.FieldsCount)
+ ->AddField<char*>("key", &resultIndexes.key)
+ .AddField<char*>("subkey", &resultIndexes.subkey)
+ .AddField("dict", resultIndexes.DictType, &resultIndexes.dict)
+ .Build();
+
+ builder.Returns(structType)
+ .Args()->Add(recordType).Done()
+ .RunConfig<TOptional<char*>>();
+
+ if (!typesOnly) {
+ builder.Implementation(new TDsvReadRecord::TFactory(
+ resultIndexes, ksvIndexes));
+ }
+ builder.IsStrict();
+ } else if (TStringRef::Of("Parse") == name) {
+ auto optionalStringType = builder.Optional()->Item<char*>().Build();
+ auto dictType = builder.Dict()->Key<char*>().Value<char*>().Build();
+
+ builder.Returns(dictType)
+ .Args()->Add<char*>().Flags(ICallablePayload::TArgumentFlags::AutoMap).Add(optionalStringType).Done()
+ .OptionalArgs(1);
+
+ if (!typesOnly) {
+ builder.Implementation(new TDsvParse(dictType));
+ }
+ builder.IsStrict();
+ } else if (TStringRef::Of("Serialize") == name) {
+ auto typeHelper = builder.TypeInfoHelper();
+ auto userTypeInspector = TTupleTypeInspector(*typeHelper, userType);
+ if (!userTypeInspector || userTypeInspector.GetElementsCount() < 1) {
+ builder.SetError("Expected user type");
+ return;
+ }
+ auto argsTypeTuple = userTypeInspector.GetElementType(0);
+ auto argsTypeInspector = TTupleTypeInspector(*typeHelper, argsTypeTuple);
+ if (!(argsTypeInspector && argsTypeInspector.GetElementsCount() == 1)) {
+ builder.SetError("Only one argument is expected " + ToString(argsTypeInspector.GetElementsCount()));
+ return;
+ }
+
+ TVector<TDataTypeId> typeIds;
+ const auto structType = argsTypeInspector.GetElementType(0);
+ THolder<TStructTypeInspector> structInspector(new TStructTypeInspector(*typeHelper, structType));
+ if (structInspector) {
+ ui32 memberCount = structInspector->GetMembersCount();
+ typeIds.reserve(memberCount);
+
+ if (memberCount) {
+ for (ui32 i = 0; i < memberCount; ++i) {
+ const TString memberName(structInspector->GetMemberName(i));
+ const auto memberType = structInspector->GetMemberType(i);
+ auto memberInspector = TDataTypeInspector(*typeHelper, memberType);
+ if (!memberInspector) {
+ builder.SetError("Only DataType members are supported at the moment, failed at " + memberName);
+ return;
+ }
+ typeIds.push_back(memberInspector.GetTypeId());
+ }
+ } else {
+ builder.SetError("Zero members in input Struct");
+ return;
+ }
+ } else {
+ builder.SetError("Only Structs are supported at the moment");
+ return;
+ }
+
+ builder.UserType(userType).Returns<char*>().Args()->Add(structType).Done();
+
+ if (!typesOnly) {
+ builder.Implementation(new TDsvSerialize(typeIds, structInspector.Release()));
+ }
+ builder.IsStrict();
+
+ }
+ } catch (const std::exception& e) {
+ builder.SetError(CurrentExceptionMessage());
+ }
+};
+
+} // namespace
+
+REGISTER_MODULES(TDsvModule)
diff --git a/yql/essentials/udfs/logs/dsv/ya.make b/yql/essentials/udfs/logs/dsv/ya.make
new file mode 100644
index 00000000000..34e29294233
--- /dev/null
+++ b/yql/essentials/udfs/logs/dsv/ya.make
@@ -0,0 +1,17 @@
+YQL_UDF_CONTRIB(dsv_udf)
+
+YQL_ABI_VERSION(
+ 2
+ 28
+ 0
+)
+
+PEERDIR(
+ library/cpp/deprecated/split
+)
+
+SRCS(
+ dsv_udf.cpp
+)
+
+END()
diff --git a/yql/essentials/udfs/logs/ya.make b/yql/essentials/udfs/logs/ya.make
new file mode 100644
index 00000000000..c725c446469
--- /dev/null
+++ b/yql/essentials/udfs/logs/ya.make
@@ -0,0 +1,3 @@
+RECURSE(
+ dsv
+)
diff --git a/yql/essentials/udfs/ya.make b/yql/essentials/udfs/ya.make
index f94bd2f08b2..4b8f9607379 100644
--- a/yql/essentials/udfs/ya.make
+++ b/yql/essentials/udfs/ya.make
@@ -1,3 +1,4 @@
RECURSE(
examples
+ logs
)