diff options
| author | vvvv <[email protected]> | 2025-10-06 13:26:25 +0300 |
|---|---|---|
| committer | vvvv <[email protected]> | 2025-10-06 14:06:25 +0300 |
| commit | eca8ce9cb1613d5c983185c4e43c20651a9638aa (patch) | |
| tree | 61ee5ae779948e61af9a7691d19eaa2c09869121 /yql/essentials/udfs/common/hyperloglog/hyperloglog_udf.cpp | |
| parent | 4adf7eecae16a9b228b28cc5f64c27ef69ad5ec2 (diff) | |
YQL-20086 udfs
init
commit_hash:f9684778bf1ea956965f2360b80b91edb7d4ffbe
Diffstat (limited to 'yql/essentials/udfs/common/hyperloglog/hyperloglog_udf.cpp')
| -rw-r--r-- | yql/essentials/udfs/common/hyperloglog/hyperloglog_udf.cpp | 751 |
1 files changed, 380 insertions, 371 deletions
diff --git a/yql/essentials/udfs/common/hyperloglog/hyperloglog_udf.cpp b/yql/essentials/udfs/common/hyperloglog/hyperloglog_udf.cpp index 39d17f2ec44..f0e2ad69149 100644 --- a/yql/essentials/udfs/common/hyperloglog/hyperloglog_udf.cpp +++ b/yql/essentials/udfs/common/hyperloglog/hyperloglog_udf.cpp @@ -10,414 +10,423 @@ using namespace NKikimr; using namespace NUdf; namespace { - class THybridHyperLogLog { - private: - using THybridSet = THashSet<ui64, std::hash<ui64>, std::equal_to<ui64>, TStdAllocatorForUdf<ui64>>; - using THybridHll = THyperLogLogWithAlloc<TStdAllocatorForUdf<ui8>>; - - explicit THybridHyperLogLog(unsigned precision) - : Var_(THybridSet()), SizeLimit_((1u << precision) / 8), Precision_(precision) - { } - - THybridHll ConvertToHyperLogLog() const { - auto res = THybridHll::Create(Precision_); - for (auto& el : GetSetRef()) { - res.Update(el); - } - return res; +class THybridHyperLogLog { +private: + using THybridSet = THashSet<ui64, std::hash<ui64>, std::equal_to<ui64>, TStdAllocatorForUdf<ui64>>; + using THybridHll = THyperLogLogWithAlloc<TStdAllocatorForUdf<ui8>>; + + explicit THybridHyperLogLog(unsigned precision) + : Var_(THybridSet()) + , SizeLimit_((1u << precision) / 8) + , Precision_(precision) + { + } + + THybridHll ConvertToHyperLogLog() const { + auto res = THybridHll::Create(Precision_); + for (auto& el : GetSetRef()) { + res.Update(el); } + return res; + } - bool IsSet() const { - return Var_.index() == 1; - } + bool IsSet() const { + return Var_.index() == 1; + } - const THybridSet& GetSetRef() const { - return std::get<1>(Var_); - } + const THybridSet& GetSetRef() const { + return std::get<1>(Var_); + } - THybridSet& GetMutableSetRef() { - return std::get<1>(Var_); - } + THybridSet& GetMutableSetRef() { + return std::get<1>(Var_); + } - const THybridHll& GetHllRef() const { - return std::get<0>(Var_); - } + const THybridHll& GetHllRef() const { + return std::get<0>(Var_); + } - THybridHll& GetMutableHllRef() { - return std::get<0>(Var_); - } + THybridHll& GetMutableHllRef() { + return std::get<0>(Var_); + } - public: - THybridHyperLogLog (THybridHyperLogLog&&) = default; +public: + THybridHyperLogLog(THybridHyperLogLog&&) = default; - THybridHyperLogLog& operator=(THybridHyperLogLog&&) = default; + THybridHyperLogLog& operator=(THybridHyperLogLog&&) = default; - void Update(ui64 hash) { - if (IsSet()) { - GetMutableSetRef().insert(hash); - if (GetSetRef().size() >= SizeLimit_) { - Var_ = ConvertToHyperLogLog(); - } - } else { - GetMutableHllRef().Update(hash); + void Update(ui64 hash) { + if (IsSet()) { + GetMutableSetRef().insert(hash); + if (GetSetRef().size() >= SizeLimit_) { + Var_ = ConvertToHyperLogLog(); } + } else { + GetMutableHllRef().Update(hash); } + } - void Merge(const THybridHyperLogLog& rh) { - if (IsSet() && rh.IsSet()) { - GetMutableSetRef().insert(rh.GetSetRef().begin(), rh.GetSetRef().end()); - if (GetSetRef().size() >= SizeLimit_) { - Var_ = ConvertToHyperLogLog(); - } - } else { - if (IsSet()) { - Var_ = ConvertToHyperLogLog(); - } - if (rh.IsSet()) { - GetMutableHllRef().Merge(rh.ConvertToHyperLogLog()); - } else { - GetMutableHllRef().Merge(rh.GetHllRef()); - } + void Merge(const THybridHyperLogLog& rh) { + if (IsSet() && rh.IsSet()) { + GetMutableSetRef().insert(rh.GetSetRef().begin(), rh.GetSetRef().end()); + if (GetSetRef().size() >= SizeLimit_) { + Var_ = ConvertToHyperLogLog(); } - } - - void Save(IOutputStream& out) const { - out.Write(static_cast<char>(Var_.index())); - out.Write(static_cast<char>(Precision_)); + } else { if (IsSet()) { - ::Save(&out, GetSetRef()); - } else { - GetHllRef().Save(out); + Var_ = ConvertToHyperLogLog(); } - } - - ui64 Estimate() const { - if (IsSet()) { - return GetSetRef().size(); - } - return GetHllRef().Estimate(); - } - - static THybridHyperLogLog Create(unsigned precision) { - Y_ENSURE(precision >= THyperLogLog::PRECISION_MIN && precision <= THyperLogLog::PRECISION_MAX); - return THybridHyperLogLog(precision); - } - - static THybridHyperLogLog Load(IInputStream& in) { - char type; - Y_ENSURE(in.ReadChar(type)); - char precision; - Y_ENSURE(in.ReadChar(precision)); - auto res = Create(precision); - if (type) { - ::Load(&in, res.GetMutableSetRef()); + if (rh.IsSet()) { + GetMutableHllRef().Merge(rh.ConvertToHyperLogLog()); } else { - res.Var_ = THybridHll::Load(in); + GetMutableHllRef().Merge(rh.GetHllRef()); } - return res; } - - private: - std::variant<THybridHll, THybridSet> Var_; - - size_t SizeLimit_; - - unsigned Precision_; - }; - - extern const char HyperLogLogResourceName[] = "HyperLogLog.State"; - - using THyperLogLogResource = TBoxedResource<THybridHyperLogLog, HyperLogLogResourceName>; - - class THyperLogLogCreate: public TBoxedValue { - public: - THyperLogLogCreate(TSourcePosition pos) - : Pos_(pos) - {} - - static const TStringRef& Name() { - static auto nameRef = TStringRef::Of("Create"); - return nameRef; + } + + void Save(IOutputStream& out) const { + out.Write(static_cast<char>(Var_.index())); + out.Write(static_cast<char>(Precision_)); + if (IsSet()) { + ::Save(&out, GetSetRef()); + } else { + GetHllRef().Save(out); } + } - private: - TUnboxedValue Run( - const IValueBuilder*, - const TUnboxedValuePod* args) const override { - try { - THolder<THyperLogLogResource> hll(new THyperLogLogResource(THybridHyperLogLog::Create(args[1].Get<ui32>()))); - hll->Get()->Update(args[0].Get<ui64>()); - return TUnboxedValuePod(hll.Release()); - } catch (const std::exception& e) { - UdfTerminate((TStringBuilder() << Pos_ << " " << e.what()).c_str()); - } + ui64 Estimate() const { + if (IsSet()) { + return GetSetRef().size(); } - - public: - static bool DeclareSignature( - const TStringRef& name, - TType* userType, - IFunctionTypeInfoBuilder& builder, - bool typesOnly) { - Y_UNUSED(userType); - if (Name() == name) { - builder.SimpleSignature<TResource<HyperLogLogResourceName>(ui64, ui32)>(); - if (!typesOnly) { - builder.Implementation(new THyperLogLogCreate(builder.GetSourcePosition())); - } - return true; - } else { - return false; - } + return GetHllRef().Estimate(); + } + + static THybridHyperLogLog Create(unsigned precision) { + Y_ENSURE(precision >= THyperLogLog::PRECISION_MIN && precision <= THyperLogLog::PRECISION_MAX); + return THybridHyperLogLog(precision); + } + + static THybridHyperLogLog Load(IInputStream& in) { + char type; + Y_ENSURE(in.ReadChar(type)); + char precision; + Y_ENSURE(in.ReadChar(precision)); + auto res = Create(precision); + if (type) { + ::Load(&in, res.GetMutableSetRef()); + } else { + res.Var_ = THybridHll::Load(in); } - - private: - TSourcePosition Pos_; - }; - - class THyperLogLogAddValue: public TBoxedValue { - public: - THyperLogLogAddValue(TSourcePosition pos) - : Pos_(pos) - {} - - static const TStringRef& Name() { - static auto nameRef = TStringRef::Of("AddValue"); - return nameRef; + return res; + } + +private: + std::variant<THybridHll, THybridSet> Var_; + + size_t SizeLimit_; + + unsigned Precision_; +}; + +extern const char HyperLogLogResourceName[] = "HyperLogLog.State"; + +using THyperLogLogResource = TBoxedResource<THybridHyperLogLog, HyperLogLogResourceName>; + +class THyperLogLogCreate: public TBoxedValue { +public: + THyperLogLogCreate(TSourcePosition pos) + : Pos_(pos) + { + } + + static const TStringRef& Name() { + static auto nameRef = TStringRef::Of("Create"); + return nameRef; + } + +private: + TUnboxedValue Run( + const IValueBuilder*, + const TUnboxedValuePod* args) const override { + try { + THolder<THyperLogLogResource> hll(new THyperLogLogResource(THybridHyperLogLog::Create(args[1].Get<ui32>()))); + hll->Get()->Update(args[0].Get<ui64>()); + return TUnboxedValuePod(hll.Release()); + } catch (const std::exception& e) { + UdfTerminate((TStringBuilder() << Pos_ << " " << e.what()).c_str()); } - - private: - TUnboxedValue Run( - const IValueBuilder* valueBuilder, - const TUnboxedValuePod* args) const override { - try { - Y_UNUSED(valueBuilder); - THyperLogLogResource* resource = static_cast<THyperLogLogResource*>(args[0].AsBoxed().Get()); - resource->Get()->Update(args[1].Get<ui64>()); - return TUnboxedValuePod(args[0]); - } catch (const std::exception& e) { - UdfTerminate((TStringBuilder() << Pos_ << " " << e.what()).c_str()); + } + +public: + static bool DeclareSignature( + const TStringRef& name, + TType* userType, + IFunctionTypeInfoBuilder& builder, + bool typesOnly) { + Y_UNUSED(userType); + if (Name() == name) { + builder.SimpleSignature<TResource<HyperLogLogResourceName>(ui64, ui32)>(); + if (!typesOnly) { + builder.Implementation(new THyperLogLogCreate(builder.GetSourcePosition())); } + return true; + } else { + return false; } - - public: - static bool DeclareSignature( - const TStringRef& name, - TType* userType, - IFunctionTypeInfoBuilder& builder, - bool typesOnly) { - Y_UNUSED(userType); - if (Name() == name) { - builder.SimpleSignature<TResource<HyperLogLogResourceName>(TResource<HyperLogLogResourceName>, ui64)>(); - if (!typesOnly) { - builder.Implementation(new THyperLogLogAddValue(builder.GetSourcePosition())); - } - builder.IsStrict(); - return true; - } else { - return false; - } - } - - private: - TSourcePosition Pos_; - }; - - class THyperLogLogSerialize: public TBoxedValue { - public: - THyperLogLogSerialize(TSourcePosition pos) - : Pos_(pos) - {} - - public: - static const TStringRef& Name() { - static auto nameRef = TStringRef::Of("Serialize"); - return nameRef; + } + +private: + TSourcePosition Pos_; +}; + +class THyperLogLogAddValue: public TBoxedValue { +public: + THyperLogLogAddValue(TSourcePosition pos) + : Pos_(pos) + { + } + + static const TStringRef& Name() { + static auto nameRef = TStringRef::Of("AddValue"); + return nameRef; + } + +private: + TUnboxedValue Run( + const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const override { + try { + Y_UNUSED(valueBuilder); + THyperLogLogResource* resource = static_cast<THyperLogLogResource*>(args[0].AsBoxed().Get()); + resource->Get()->Update(args[1].Get<ui64>()); + return TUnboxedValuePod(args[0]); + } catch (const std::exception& e) { + UdfTerminate((TStringBuilder() << Pos_ << " " << e.what()).c_str()); } - - private: - TUnboxedValue Run( - const IValueBuilder* valueBuilder, - const TUnboxedValuePod* args) const override { - try { - TStringStream result; - static_cast<THyperLogLogResource*>(args[0].AsBoxed().Get())->Get()->Save(result); - return valueBuilder->NewString(result.Str()); - } catch (const std::exception& e) { - UdfTerminate((TStringBuilder() << Pos_ << " " << e.what()).c_str()); + } + +public: + static bool DeclareSignature( + const TStringRef& name, + TType* userType, + IFunctionTypeInfoBuilder& builder, + bool typesOnly) { + Y_UNUSED(userType); + if (Name() == name) { + builder.SimpleSignature<TResource<HyperLogLogResourceName>(TResource<HyperLogLogResourceName>, ui64)>(); + if (!typesOnly) { + builder.Implementation(new THyperLogLogAddValue(builder.GetSourcePosition())); } + builder.IsStrict(); + return true; + } else { + return false; } - - public: - static bool DeclareSignature( - const TStringRef& name, - TType* userType, - IFunctionTypeInfoBuilder& builder, - bool typesOnly) { - Y_UNUSED(userType); - if (Name() == name) { - builder.SimpleSignature<char*(TResource<HyperLogLogResourceName>)>(); - if (!typesOnly) { - builder.Implementation(new THyperLogLogSerialize(builder.GetSourcePosition())); - } - return true; - } else { - return false; - } + } + +private: + TSourcePosition Pos_; +}; + +class THyperLogLogSerialize: public TBoxedValue { +public: + THyperLogLogSerialize(TSourcePosition pos) + : Pos_(pos) + { + } + +public: + static const TStringRef& Name() { + static auto nameRef = TStringRef::Of("Serialize"); + return nameRef; + } + +private: + TUnboxedValue Run( + const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const override { + try { + TStringStream result; + static_cast<THyperLogLogResource*>(args[0].AsBoxed().Get())->Get()->Save(result); + return valueBuilder->NewString(result.Str()); + } catch (const std::exception& e) { + UdfTerminate((TStringBuilder() << Pos_ << " " << e.what()).c_str()); } - - private: - TSourcePosition Pos_; - }; - - class THyperLogLogDeserialize: public TBoxedValue { - public: - THyperLogLogDeserialize(TSourcePosition pos) - : Pos_(pos) - {} - - static const TStringRef& Name() { - static auto nameRef = TStringRef::Of("Deserialize"); - return nameRef; - } - - private: - TUnboxedValue Run( - const IValueBuilder* valueBuilder, - const TUnboxedValuePod* args) const override { - try { - Y_UNUSED(valueBuilder); - const TString arg(args[0].AsStringRef()); - TStringInput input(arg); - THolder<THyperLogLogResource> hll(new THyperLogLogResource(THybridHyperLogLog::Load(input))); - return TUnboxedValuePod(hll.Release()); - } catch (const std::exception& e) { - UdfTerminate((TStringBuilder() << Pos_ << " " << e.what()).c_str()); + } + +public: + static bool DeclareSignature( + const TStringRef& name, + TType* userType, + IFunctionTypeInfoBuilder& builder, + bool typesOnly) { + Y_UNUSED(userType); + if (Name() == name) { + builder.SimpleSignature<char*(TResource<HyperLogLogResourceName>)>(); + if (!typesOnly) { + builder.Implementation(new THyperLogLogSerialize(builder.GetSourcePosition())); } + return true; + } else { + return false; } - - public: - static bool DeclareSignature( - const TStringRef& name, - TType* userType, - IFunctionTypeInfoBuilder& builder, - bool typesOnly) { - Y_UNUSED(userType); - if (Name() == name) { - builder.SimpleSignature<TResource<HyperLogLogResourceName>(char*)>(); - if (!typesOnly) { - builder.Implementation(new THyperLogLogDeserialize(builder.GetSourcePosition())); - } - return true; - } else { - return false; - } - } - - private: - TSourcePosition Pos_; - }; - - class THyperLogLogMerge: public TBoxedValue { - public: - THyperLogLogMerge(TSourcePosition pos) - : Pos_(pos) - {} - - static const TStringRef& Name() { - static auto nameRef = TStringRef::Of("Merge"); - return nameRef; - } - - private: - TUnboxedValue Run( - const IValueBuilder* valueBuilder, - const TUnboxedValuePod* args) const override { - try { - Y_UNUSED(valueBuilder); - auto left = static_cast<THyperLogLogResource*>(args[0].AsBoxed().Get())->Get(); - static_cast<THyperLogLogResource*>(args[1].AsBoxed().Get())->Get()->Merge(*left); - return TUnboxedValuePod(args[1]); - } catch (const std::exception& e) { - UdfTerminate((TStringBuilder() << Pos_ << " " << e.what()).c_str()); - } + } + +private: + TSourcePosition Pos_; +}; + +class THyperLogLogDeserialize: public TBoxedValue { +public: + THyperLogLogDeserialize(TSourcePosition pos) + : Pos_(pos) + { + } + + static const TStringRef& Name() { + static auto nameRef = TStringRef::Of("Deserialize"); + return nameRef; + } + +private: + TUnboxedValue Run( + const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const override { + try { + Y_UNUSED(valueBuilder); + const TString arg(args[0].AsStringRef()); + TStringInput input(arg); + THolder<THyperLogLogResource> hll(new THyperLogLogResource(THybridHyperLogLog::Load(input))); + return TUnboxedValuePod(hll.Release()); + } catch (const std::exception& e) { + UdfTerminate((TStringBuilder() << Pos_ << " " << e.what()).c_str()); } - - public: - static bool DeclareSignature( - const TStringRef& name, - TType* userType, - IFunctionTypeInfoBuilder& builder, - bool typesOnly) { - Y_UNUSED(userType); - if (Name() == name) { - builder.SimpleSignature<TResource<HyperLogLogResourceName>(TResource<HyperLogLogResourceName>, TResource<HyperLogLogResourceName>)>(); - if (!typesOnly) { - builder.Implementation(new THyperLogLogMerge(builder.GetSourcePosition())); - } - builder.IsStrict(); - return true; - } else { - return false; + } + +public: + static bool DeclareSignature( + const TStringRef& name, + TType* userType, + IFunctionTypeInfoBuilder& builder, + bool typesOnly) { + Y_UNUSED(userType); + if (Name() == name) { + builder.SimpleSignature<TResource<HyperLogLogResourceName>(char*)>(); + if (!typesOnly) { + builder.Implementation(new THyperLogLogDeserialize(builder.GetSourcePosition())); } + return true; + } else { + return false; } - - private: - TSourcePosition Pos_; - }; - - class THyperLogLogGetResult: public TBoxedValue { - public: - THyperLogLogGetResult(TSourcePosition pos) - : Pos_(pos) - {} - - static const TStringRef& Name() { - static auto nameRef = TStringRef::Of("GetResult"); - return nameRef; - } - - private: - TUnboxedValue Run( - const IValueBuilder* valueBuilder, - const TUnboxedValuePod* args) const override { + } + +private: + TSourcePosition Pos_; +}; + +class THyperLogLogMerge: public TBoxedValue { +public: + THyperLogLogMerge(TSourcePosition pos) + : Pos_(pos) + { + } + + static const TStringRef& Name() { + static auto nameRef = TStringRef::Of("Merge"); + return nameRef; + } + +private: + TUnboxedValue Run( + const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const override { + try { Y_UNUSED(valueBuilder); - auto hll = static_cast<THyperLogLogResource*>(args[0].AsBoxed().Get())->Get(); - return TUnboxedValuePod(hll->Estimate()); + auto left = static_cast<THyperLogLogResource*>(args[0].AsBoxed().Get())->Get(); + static_cast<THyperLogLogResource*>(args[1].AsBoxed().Get())->Get()->Merge(*left); + return TUnboxedValuePod(args[1]); + } catch (const std::exception& e) { + UdfTerminate((TStringBuilder() << Pos_ << " " << e.what()).c_str()); } - - public: - static bool DeclareSignature( - const TStringRef& name, - TType* userType, - IFunctionTypeInfoBuilder& builder, - bool typesOnly) { - Y_UNUSED(userType); - if (Name() == name) { - auto resource = builder.Resource(HyperLogLogResourceName); - builder.Args()->Add(resource).Done().Returns<ui64>(); - - if (!typesOnly) { - builder.Implementation(new THyperLogLogGetResult(builder.GetSourcePosition())); - } - builder.IsStrict(); - return true; - } else { - return false; + } + +public: + static bool DeclareSignature( + const TStringRef& name, + TType* userType, + IFunctionTypeInfoBuilder& builder, + bool typesOnly) { + Y_UNUSED(userType); + if (Name() == name) { + builder.SimpleSignature<TResource<HyperLogLogResourceName>(TResource<HyperLogLogResourceName>, TResource<HyperLogLogResourceName>)>(); + if (!typesOnly) { + builder.Implementation(new THyperLogLogMerge(builder.GetSourcePosition())); } + builder.IsStrict(); + return true; + } else { + return false; } - - private: - TSourcePosition Pos_; - }; - - SIMPLE_MODULE(THyperLogLogModule, - THyperLogLogCreate, - THyperLogLogAddValue, - THyperLogLogSerialize, - THyperLogLogDeserialize, - THyperLogLogMerge, - THyperLogLogGetResult) -} + } + +private: + TSourcePosition Pos_; +}; + +class THyperLogLogGetResult: public TBoxedValue { +public: + THyperLogLogGetResult(TSourcePosition pos) + : Pos_(pos) + { + } + + static const TStringRef& Name() { + static auto nameRef = TStringRef::Of("GetResult"); + return nameRef; + } + +private: + TUnboxedValue Run( + const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const override { + Y_UNUSED(valueBuilder); + auto hll = static_cast<THyperLogLogResource*>(args[0].AsBoxed().Get())->Get(); + return TUnboxedValuePod(hll->Estimate()); + } + +public: + static bool DeclareSignature( + const TStringRef& name, + TType* userType, + IFunctionTypeInfoBuilder& builder, + bool typesOnly) { + Y_UNUSED(userType); + if (Name() == name) { + auto resource = builder.Resource(HyperLogLogResourceName); + builder.Args()->Add(resource).Done().Returns<ui64>(); + + if (!typesOnly) { + builder.Implementation(new THyperLogLogGetResult(builder.GetSourcePosition())); + } + builder.IsStrict(); + return true; + } else { + return false; + } + } + +private: + TSourcePosition Pos_; +}; + +SIMPLE_MODULE(THyperLogLogModule, + THyperLogLogCreate, + THyperLogLogAddValue, + THyperLogLogSerialize, + THyperLogLogDeserialize, + THyperLogLogMerge, + THyperLogLogGetResult) +} // namespace REGISTER_MODULES(THyperLogLogModule) |
