diff options
author | v-mak33v <v-mak33v@yandex-team.ru> | 2022-02-10 16:50:50 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:50 +0300 |
commit | fd57eb4948385b403f53b2f711399e516d46dcac (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 | |
parent | b563943b467b077146b7f7dab9d0d31f786d1515 (diff) | |
download | ydb-fd57eb4948385b403f53b2f711399e516d46dcac.tar.gz |
Restoring authorship annotation for <v-mak33v@yandex-team.ru>. Commit 2 of 2.
22 files changed, 505 insertions, 505 deletions
diff --git a/library/cpp/hyperloglog/hyperloglog.cpp b/library/cpp/hyperloglog/hyperloglog.cpp index 99ee73972f..ec8352abe1 100644 --- a/library/cpp/hyperloglog/hyperloglog.cpp +++ b/library/cpp/hyperloglog/hyperloglog.cpp @@ -87,12 +87,12 @@ namespace { } } - double RawEstimate(const ui8* counts, size_t size) { + double RawEstimate(const ui8* counts, size_t size) { double sum = {}; - for (size_t i = 0; i < size; ++i) { - sum += std::pow(2.0, -counts[i]); + for (size_t i = 0; i < size; ++i) { + sum += std::pow(2.0, -counts[i]); } - return EmpiricAlpha(size) * size * size / sum; + return EmpiricAlpha(size) * size * size / sum; } double LinearCounting(size_t registers, size_t zeroed) { @@ -100,38 +100,38 @@ namespace { } } -THyperLogLogBase::THyperLogLogBase(unsigned precision) - : Precision(precision) { +THyperLogLogBase::THyperLogLogBase(unsigned precision) + : Precision(precision) { Y_ENSURE(precision >= PRECISION_MIN && precision <= PRECISION_MAX); } -void THyperLogLogBase::Update(ui64 hash) { +void THyperLogLogBase::Update(ui64 hash) { const unsigned subHashBits = 8 * sizeof(hash) - Precision; const auto subHash = hash & MaskLowerBits(subHashBits); const auto leadingZeroes = subHash ? (subHashBits - GetValueBitCount(subHash)) : subHashBits; const ui8 weight = static_cast<ui8>(leadingZeroes + 1); const size_t reg = static_cast<size_t>(hash >> subHashBits); - RegistersRef[reg] = std::max(RegistersRef[reg], weight); + RegistersRef[reg] = std::max(RegistersRef[reg], weight); } -void THyperLogLogBase::Merge(const THyperLogLogBase& rh) { +void THyperLogLogBase::Merge(const THyperLogLogBase& rh) { Y_ENSURE(Precision == rh.Precision); - std::transform(RegistersRef.begin(), RegistersRef.end(), rh.RegistersRef.begin(), RegistersRef.begin(), [](ui8 l, ui8 r) { return std::max(l, r); }); + std::transform(RegistersRef.begin(), RegistersRef.end(), rh.RegistersRef.begin(), RegistersRef.begin(), [](ui8 l, ui8 r) { return std::max(l, r); }); } -ui64 THyperLogLogBase::Estimate() const { - const auto m = RegistersRef.size(); - const auto e = RawEstimate(RegistersRef.data(), m); +ui64 THyperLogLogBase::Estimate() const { + const auto m = RegistersRef.size(); + const auto e = RawEstimate(RegistersRef.data(), m); const auto e_ = e <= 5 * m ? (e - EstimateBias(e, Precision)) : e; - const auto v = std::count(RegistersRef.begin(), RegistersRef.end(), ui8(0)); + const auto v = std::count(RegistersRef.begin(), RegistersRef.end(), ui8(0)); const auto h = v != 0 ? LinearCounting(m, v) : e_; return h <= GetThreshold(Precision) ? h : e_; } -void THyperLogLogBase::Save(IOutputStream& out) const { +void THyperLogLogBase::Save(IOutputStream& out) const { out.Write(static_cast<char>(Precision)); - out.Write(RegistersRef.data(), RegistersRef.size() * sizeof(RegistersRef.front())); + out.Write(RegistersRef.data(), RegistersRef.size() * sizeof(RegistersRef.front())); } diff --git a/library/cpp/hyperloglog/hyperloglog.h b/library/cpp/hyperloglog/hyperloglog.h index 2b9725dc6e..e79ee0ed77 100644 --- a/library/cpp/hyperloglog/hyperloglog.h +++ b/library/cpp/hyperloglog/hyperloglog.h @@ -1,64 +1,64 @@ #pragma once #include <util/system/types.h> -#include <util/stream/input.h> -#include <util/generic/array_ref.h> +#include <util/stream/input.h> +#include <util/generic/array_ref.h> #include <vector> class IOutputStream; -class THyperLogLogBase { -protected: - explicit THyperLogLogBase(unsigned precision); +class THyperLogLogBase { +protected: + explicit THyperLogLogBase(unsigned precision); public: static const constexpr unsigned PRECISION_MIN = 4; - + static const constexpr unsigned PRECISION_MAX = 18; void Update(ui64 hash); - void Merge(const THyperLogLogBase& rh); + void Merge(const THyperLogLogBase& rh); ui64 Estimate() const; void Save(IOutputStream& out) const; -protected: - unsigned Precision; - - TArrayRef<ui8> RegistersRef; -}; - -template <typename Alloc> -class THyperLogLogWithAlloc : public THyperLogLogBase { -private: - explicit THyperLogLogWithAlloc(unsigned precision) - : THyperLogLogBase(precision) { - Registers.resize(1u << precision); - RegistersRef = MakeArrayRef(Registers); +protected: + unsigned Precision; + + TArrayRef<ui8> RegistersRef; +}; + +template <typename Alloc> +class THyperLogLogWithAlloc : public THyperLogLogBase { +private: + explicit THyperLogLogWithAlloc(unsigned precision) + : THyperLogLogBase(precision) { + Registers.resize(1u << precision); + RegistersRef = MakeArrayRef(Registers); + } + +public: + THyperLogLogWithAlloc(THyperLogLogWithAlloc&&) = default; + + THyperLogLogWithAlloc& operator=(THyperLogLogWithAlloc&&) = default; + + static THyperLogLogWithAlloc Create(unsigned precision) { + return THyperLogLogWithAlloc(precision); + } + + static THyperLogLogWithAlloc Load(IInputStream& in) { + char precision = {}; + Y_ENSURE(in.ReadChar(precision)); + auto res = Create(precision); + in.LoadOrFail(res.Registers.data(), res.Registers.size() * sizeof(res.Registers.front())); + return res; } -public: - THyperLogLogWithAlloc(THyperLogLogWithAlloc&&) = default; - - THyperLogLogWithAlloc& operator=(THyperLogLogWithAlloc&&) = default; - - static THyperLogLogWithAlloc Create(unsigned precision) { - return THyperLogLogWithAlloc(precision); - } - - static THyperLogLogWithAlloc Load(IInputStream& in) { - char precision = {}; - Y_ENSURE(in.ReadChar(precision)); - auto res = Create(precision); - in.LoadOrFail(res.Registers.data(), res.Registers.size() * sizeof(res.Registers.front())); - return res; - } - private: - std::vector<ui8, Alloc> Registers; + std::vector<ui8, Alloc> Registers; }; - -using THyperLogLog = THyperLogLogWithAlloc<std::allocator<ui8>>; + +using THyperLogLog = THyperLogLogWithAlloc<std::allocator<ui8>>; diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp index af04cdd200..b74abc1c34 100644 --- a/ydb/library/yql/core/facade/yql_facade.cpp +++ b/ydb/library/yql/core/facade/yql_facade.cpp @@ -1120,29 +1120,29 @@ TMaybe<TString> TProgram::GetTasksInfo() { } TMaybe<TString> TProgram::GetStatistics(bool totalOnly) { - if (!TypeCtx_) { - return Nothing(); - } - - TStringStream out; + if (!TypeCtx_) { + return Nothing(); + } + + TStringStream out; NYson::TYsonWriter writer(&out); // Header - writer.OnBeginMap(); - writer.OnKeyedItem("ExecutionStatistics"); - writer.OnBeginMap(); - + writer.OnBeginMap(); + writer.OnKeyedItem("ExecutionStatistics"); + writer.OnBeginMap(); + // Providers - bool hasStatistics = false; - for (auto& datasink : TypeCtx_->DataSinks) { + bool hasStatistics = false; + for (auto& datasink : TypeCtx_->DataSinks) { TStringStream providerOut; NYson::TYsonWriter providerWriter(&providerOut); if (datasink->CollectStatistics(providerWriter, totalOnly)) { - writer.OnKeyedItem(datasink->GetName()); + writer.OnKeyedItem(datasink->GetName()); writer.OnRaw(providerOut.Str()); hasStatistics = true; - } - } - + } + } + auto rusage = TRusage::Get(); // System stats writer.OnKeyedItem("system"); @@ -1161,14 +1161,14 @@ TMaybe<TString> TProgram::GetStatistics(bool totalOnly) { writer.OnEndMap(); // system // Footer - writer.OnEndMap(); - writer.OnEndMap(); - if (hasStatistics) { - return out.Str(); - } - return Nothing(); + writer.OnEndMap(); + writer.OnEndMap(); + if (hasStatistics) { + return out.Str(); + } + return Nothing(); } - + TMaybe<TString> TProgram::GetDiscoveredData() { if (!TypeCtx_) { return Nothing(); @@ -1187,8 +1187,8 @@ TMaybe<TString> TProgram::GetDiscoveredData() { } writer.OnEndMap(); return out.Str(); -} - +} + TProgram::TFutureStatus TProgram::ContinueAsync() { YQL_LOG_CTX_ROOT_SCOPE(GetSessionId()); return AsyncTransformWithFallback(true); diff --git a/ydb/library/yql/core/facade/yql_facade.h b/ydb/library/yql/core/facade/yql_facade.h index d1aef7a3bc..1f8f47681d 100644 --- a/ydb/library/yql/core/facade/yql_facade.h +++ b/ydb/library/yql/core/facade/yql_facade.h @@ -218,7 +218,7 @@ public: TMaybe<TString> GetTasksInfo(); TMaybe<TString> GetStatistics(bool totalOnly = false); - + TMaybe<TString> GetDiscoveredData(); TString ResultsAsString() const; @@ -290,8 +290,8 @@ public: SupportsResultPosition_ = true; } - IPlanBuilder& GetPlanBuilder(); - + IPlanBuilder& GetPlanBuilder(); + private: TProgram( const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp index 6b93f80169..ce22b12571 100644 --- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -5960,12 +5960,12 @@ THolder<IGraphTransformer> MakePeepholeOptimization(TTypeAnnotationContextPtr ty peepholeSettings.CommonConfig = peepholeSettings.FinalConfig = config; auto commonTransformer = CreatePeepHoleCommonStageTransformer<true>(*typeAnnotationContext, nullptr, peepholeSettings); auto finalTransformer = CreatePeepHoleFinalStageTransformer<true>(*typeAnnotationContext, nullptr, nullptr, peepholeSettings); - return CreateFunctorTransformer( + return CreateFunctorTransformer( [common = std::move(commonTransformer), final = std::move(finalTransformer)](TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) -> IGraphTransformer::TStatus { return DoPeepHoleOptimizeNode(input, output, ctx, *common, *final); - }); + }); } - + template IGraphTransformer::TStatus PeepHoleOptimizeNode<true>(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& types, IGraphTransformer* typeAnnotator, bool& hasNonDeterministicFunctions, const TPeepholeSettings& peepholeSettings); @@ -5973,4 +5973,4 @@ template IGraphTransformer::TStatus PeepHoleOptimizeNode<true>(const TExprNode:: template IGraphTransformer::TStatus PeepHoleOptimizeNode<false>(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& types, IGraphTransformer* typeAnnotator, bool& hasNonDeterministicFunctions, const TPeepholeSettings& peepholeSettings); -} +} diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.h b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.h index 55a8b966c8..a357c5b395 100644 --- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.h +++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.h @@ -18,7 +18,7 @@ template <bool EnableNewOptimizers> IGraphTransformer::TStatus PeepHoleOptimizeNode(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& types, IGraphTransformer* typeAnnotator, bool& hasNonDeterministicFunctions, const TPeepholeSettings& peepholeSettings = {}); - + THolder<IGraphTransformer> MakePeepholeOptimization(TTypeAnnotationContextPtr typeAnnotationContext, const IPipelineConfigurator* config = nullptr); - + } diff --git a/ydb/library/yql/core/services/ya.make b/ydb/library/yql/core/services/ya.make index 2b78325519..5cebdea948 100644 --- a/ydb/library/yql/core/services/ya.make +++ b/ydb/library/yql/core/services/ya.make @@ -11,8 +11,8 @@ SRCS( yql_eval_expr.h yql_eval_params.cpp yql_eval_params.h - yql_out_transformers.cpp - yql_out_transformers.h + yql_out_transformers.cpp + yql_out_transformers.h yql_plan.cpp yql_plan.h yql_transform_pipeline.cpp diff --git a/ydb/library/yql/core/services/yql_out_transformers.cpp b/ydb/library/yql/core/services/yql_out_transformers.cpp index e1ec9d375c..d8363095f8 100644 --- a/ydb/library/yql/core/services/yql_out_transformers.cpp +++ b/ydb/library/yql/core/services/yql_out_transformers.cpp @@ -1,52 +1,52 @@ -#include "yql_out_transformers.h" - +#include "yql_out_transformers.h" + #include <ydb/library/yql/ast/yql_expr.h> - + #include <library/cpp/yson/writer.h> - -#include <util/stream/null.h> + +#include <util/stream/null.h> #include <util/stream/str.h> - -namespace NYql { - -IGraphTransformer::TStatus TExprOutputTransformer::operator()( - const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -{ - Y_UNUSED(ctx); - output = input; - if (!ExprRoot_) { - return IGraphTransformer::TStatus::Ok; - } - + +namespace NYql { + +IGraphTransformer::TStatus TExprOutputTransformer::operator()( + const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) +{ + Y_UNUSED(ctx); + output = input; + if (!ExprRoot_) { + return IGraphTransformer::TStatus::Ok; + } + auto ast = ConvertToAst(*ExprRoot_, ctx, WithTypes_ ? TExprAnnotationFlags::Types : TExprAnnotationFlags::None, true); - ui32 prettyFlags = TAstPrintFlags::ShortQuote; - if (!WithTypes_) { - prettyFlags |= TAstPrintFlags::PerLine; - } - - if (DirectOut_) { - ast.Root->PrettyPrintTo(*DirectOut_, prettyFlags); - } - return IGraphTransformer::TStatus::Ok; -} - -IGraphTransformer::TStatus TPlanOutputTransformer::operator()( - const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -{ - Y_UNUSED(ctx); - output = input; - if (DirectOut_) { + ui32 prettyFlags = TAstPrintFlags::ShortQuote; + if (!WithTypes_) { + prettyFlags |= TAstPrintFlags::PerLine; + } + + if (DirectOut_) { + ast.Root->PrettyPrintTo(*DirectOut_, prettyFlags); + } + return IGraphTransformer::TStatus::Ok; +} + +IGraphTransformer::TStatus TPlanOutputTransformer::operator()( + const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) +{ + Y_UNUSED(ctx); + output = input; + if (DirectOut_) { NYson::TYsonWriter writer(DirectOut_, OutputFormat_); - Builder_.WritePlan(writer, input); - } else { - TNullOutput null; + Builder_.WritePlan(writer, input); + } else { + TNullOutput null; NYson::TYsonWriter writer(&null, OutputFormat_); - Builder_.WritePlan(writer, input); - } - - return IGraphTransformer::TStatus::Ok; -} - + Builder_.WritePlan(writer, input); + } + + return IGraphTransformer::TStatus::Ok; +} + IGraphTransformer::TStatus TExprLogTransformer::operator()( const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { diff --git a/ydb/library/yql/core/services/yql_out_transformers.h b/ydb/library/yql/core/services/yql_out_transformers.h index e08d4d0115..d90f88cd99 100644 --- a/ydb/library/yql/core/services/yql_out_transformers.h +++ b/ydb/library/yql/core/services/yql_out_transformers.h @@ -1,69 +1,69 @@ -#pragma once - +#pragma once + #include <ydb/library/yql/core/services/yql_transform_pipeline.h> #include <ydb/library/yql/core/services/yql_plan.h> #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/core/yql_type_annotation.h> #include <ydb/library/yql/core/yql_graph_transformer.h> - + #include <library/cpp/yson/public.h> - -#include <util/stream/output.h> -#include <util/generic/ptr.h> - -namespace NYql { - -class TExprOutputTransformer { -public: - TExprOutputTransformer(const TExprNode::TPtr& exprRoot, IOutputStream* directOut, bool withTypes) - : ExprRoot_(exprRoot), DirectOut_(directOut), WithTypes_(withTypes) - { - } - - IGraphTransformer::TStatus operator()(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx); - - static TAutoPtr<IGraphTransformer> Sync( - const TExprNode::TPtr& exprRoot, - IOutputStream* directOut, - bool withTypes = false) - { - return directOut ? CreateFunctorTransformer(TExprOutputTransformer(exprRoot, directOut, withTypes)) : nullptr; - } - -private: - const TExprNode::TPtr &ExprRoot_; - IOutputStream *DirectOut_; - bool WithTypes_; -}; - -class TPlanOutputTransformer { -public: - TPlanOutputTransformer( + +#include <util/stream/output.h> +#include <util/generic/ptr.h> + +namespace NYql { + +class TExprOutputTransformer { +public: + TExprOutputTransformer(const TExprNode::TPtr& exprRoot, IOutputStream* directOut, bool withTypes) + : ExprRoot_(exprRoot), DirectOut_(directOut), WithTypes_(withTypes) + { + } + + IGraphTransformer::TStatus operator()(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx); + + static TAutoPtr<IGraphTransformer> Sync( + const TExprNode::TPtr& exprRoot, + IOutputStream* directOut, + bool withTypes = false) + { + return directOut ? CreateFunctorTransformer(TExprOutputTransformer(exprRoot, directOut, withTypes)) : nullptr; + } + +private: + const TExprNode::TPtr &ExprRoot_; + IOutputStream *DirectOut_; + bool WithTypes_; +}; + +class TPlanOutputTransformer { +public: + TPlanOutputTransformer( IOutputStream* directOut, - IPlanBuilder& builder, + IPlanBuilder& builder, NYson::EYsonFormat outputFormat) : DirectOut_(directOut) - , Builder_(builder) - , OutputFormat_(outputFormat) - { - } - - IGraphTransformer::TStatus operator()(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx); - - static TAutoPtr <IGraphTransformer> Sync( + , Builder_(builder) + , OutputFormat_(outputFormat) + { + } + + IGraphTransformer::TStatus operator()(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx); + + static TAutoPtr <IGraphTransformer> Sync( IOutputStream* directOut, IPlanBuilder& builder, NYson::EYsonFormat outputFormat) - { + { return CreateFunctorTransformer(TPlanOutputTransformer(directOut, builder, outputFormat)); - } - -private: + } + +private: IOutputStream* DirectOut_; - IPlanBuilder& Builder_; + IPlanBuilder& Builder_; NYson::EYsonFormat OutputFormat_; -}; - +}; + class TExprLogTransformer { public: TExprLogTransformer(const TString& description, NYql::NLog::EComponent component, NYql::NLog::ELevel level) diff --git a/ydb/library/yql/core/services/yql_transform_pipeline.cpp b/ydb/library/yql/core/services/yql_transform_pipeline.cpp index 13a9b332d6..ae425f34c8 100644 --- a/ydb/library/yql/core/services/yql_transform_pipeline.cpp +++ b/ydb/library/yql/core/services/yql_transform_pipeline.cpp @@ -235,9 +235,9 @@ TAutoPtr<IGraphTransformer> TTransformationPipeline::BuildWithNoArgChecks(bool u return CreateCompositeGraphTransformerWithNoArgChecks(Transformers_, useIssueScopes); } -TIntrusivePtr<TTypeAnnotationContext> TTransformationPipeline::GetTypeAnnotationContext() const { - return TypeAnnotationContext_; -} - - +TIntrusivePtr<TTypeAnnotationContext> TTransformationPipeline::GetTypeAnnotationContext() const { + return TypeAnnotationContext_; +} + + } // namespace NYql diff --git a/ydb/library/yql/core/services/yql_transform_pipeline.h b/ydb/library/yql/core/services/yql_transform_pipeline.h index 4ed0c3b91b..7e2f5c911e 100644 --- a/ydb/library/yql/core/services/yql_transform_pipeline.h +++ b/ydb/library/yql/core/services/yql_transform_pipeline.h @@ -50,8 +50,8 @@ public: TAutoPtr<IGraphTransformer> Build(bool useIssueScopes = true); TAutoPtr<IGraphTransformer> BuildWithNoArgChecks(bool useIssueScopes = true); - TIntrusivePtr<TTypeAnnotationContext> GetTypeAnnotationContext() const; - + TIntrusivePtr<TTypeAnnotationContext> GetTypeAnnotationContext() const; + private: TIntrusivePtr<TTypeAnnotationContext> TypeAnnotationContext_; TVector<TTransformStage> Transformers_; diff --git a/ydb/library/yql/core/yql_execution.cpp b/ydb/library/yql/core/yql_execution.cpp index 9d2295fb82..eb138b1b4f 100644 --- a/ydb/library/yql/core/yql_execution.cpp +++ b/ydb/library/yql/core/yql_execution.cpp @@ -508,8 +508,8 @@ public: : TOperationProgress::EState::Failed; if (progIt->second.State != newState) { - TString stage = progIt->second.Stage.first; - progIt->second = TOperationProgress(TString(category), *publicId, newState, stage); + TString stage = progIt->second.Stage.first; + progIt->second = TOperationProgress(TString(category), *publicId, newState, stage); Writer(progIt->second); } } diff --git a/ydb/library/yql/core/yql_execution.h b/ydb/library/yql/core/yql_execution.h index 587d469d72..cb38d913fa 100644 --- a/ydb/library/yql/core/yql_execution.h +++ b/ydb/library/yql/core/yql_execution.h @@ -22,9 +22,9 @@ namespace NYql { ui32 Id; EState State; - using TStage = std::pair<TString, TInstant>; - TStage Stage; - + using TStage = std::pair<TString, TInstant>; + TStage Stage; + TString RemoteId; struct TCounters { @@ -52,27 +52,27 @@ namespace NYql { TMaybe<TCounters> Counters; - TOperationProgress(const TString& category, ui32 id, - EState state, const TString& stage = "") + TOperationProgress(const TString& category, ui32 id, + EState state, const TString& stage = "") : Category(category) , Id(id) , State(state) - , Stage(stage, TInstant::Now()) + , Stage(stage, TInstant::Now()) { } }; - struct TOperationStatistics { - struct TEntry { - TString Name; - - TMaybe<i64> Sum; - TMaybe<i64> Max; - TMaybe<i64> Min; - TMaybe<i64> Avg; + struct TOperationStatistics { + struct TEntry { + TString Name; + + TMaybe<i64> Sum; + TMaybe<i64> Max; + TMaybe<i64> Min; + TMaybe<i64> Avg; TMaybe<i64> Count; TMaybe<TString> Value; - + TEntry(TString name, TMaybe<i64> sum, TMaybe<i64> max, TMaybe<i64> min, TMaybe<i64> avg, TMaybe<i64> count) : Name(std::move(name)) , Sum(std::move(sum)) @@ -80,37 +80,37 @@ namespace NYql { , Min(std::move(min)) , Avg(std::move(avg)) , Count(std::move(count)) - { - } + { + } TEntry(TString name, TString value) : Name(std::move(name)) , Value(std::move(value)) { } - }; - - TVector<TEntry> Entries; - }; - - using TStatWriter = std::function<void(ui32, const TVector<TOperationStatistics::TEntry>&)>; + }; + + TVector<TEntry> Entries; + }; + + using TStatWriter = std::function<void(ui32, const TVector<TOperationStatistics::TEntry>&)>; using TOperationProgressWriter = std::function<void(const TOperationProgress&)>; - inline TStatWriter ThreadSafeStatWriter(TStatWriter base) { - struct TState : public TThrRefBase { - TStatWriter Base; - TMutex Mutex; - }; - - auto state = MakeIntrusive<TState>(); - state->Base = base; - return [state](ui32 id, const TVector<TOperationStatistics::TEntry>& stat) { - with_lock(state->Mutex) { - state->Base(id, stat); - } - }; - } - + inline TStatWriter ThreadSafeStatWriter(TStatWriter base) { + struct TState : public TThrRefBase { + TStatWriter Base; + TMutex Mutex; + }; + + auto state = MakeIntrusive<TState>(); + state->Base = base; + return [state](ui32 id, const TVector<TOperationStatistics::TEntry>& stat) { + with_lock(state->Mutex) { + state->Base(id, stat); + } + }; + } + inline void NullProgressWriter(const TOperationProgress& progress) { Y_UNUSED(progress); } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_replicate.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_replicate.cpp index f32e28a9c2..44489b6266 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_replicate.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_replicate.cpp @@ -199,7 +199,7 @@ public: NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { const auto count = Count->GetValue(ctx).Get<ui64>(); - const ui64 MAX_VALUE = 1ull << 32; + const ui64 MAX_VALUE = 1ull << 32; if (count >= MAX_VALUE) { TStringBuilder res; res << Pos << " Second argument in ListReplicate = " << count << " exceeds maximum value = " << MAX_VALUE; diff --git a/ydb/library/yql/minikql/mkql_type_ops.cpp b/ydb/library/yql/minikql/mkql_type_ops.cpp index db021d08ec..d559bb108c 100644 --- a/ydb/library/yql/minikql/mkql_type_ops.cpp +++ b/ydb/library/yql/minikql/mkql_type_ops.cpp @@ -175,8 +175,8 @@ ui32 GetMonthLength(ui32 month, bool isLeap) { } } -namespace { - +namespace { + ui32 LeapDaysSinceEpoch(ui32 yearsSinceEpoch) { ui32 leapDaysCount = (yearsSinceEpoch + 1) / 4; if (yearsSinceEpoch >= 131) { @@ -857,8 +857,8 @@ bool MakeTzDatetime(ui32 year, ui32 month, ui32 day, ui32 hour, ui32 min, ui32 s if (tzId) { const auto& tz = Singleton<TTimezones>()->GetZone(tzId); cctz::civil_second cs(year, month, day, hour, min, sec); - auto utcSeconds = cctz::TimePointToUnixSeconds(tz.lookup(cs).pre); - if (utcSeconds < 0 || utcSeconds >= (std::int_fast64_t) NUdf::MAX_DATETIME) { + auto utcSeconds = cctz::TimePointToUnixSeconds(tz.lookup(cs).pre); + if (utcSeconds < 0 || utcSeconds >= (std::int_fast64_t) NUdf::MAX_DATETIME) { return false; } diff --git a/ydb/library/yql/minikql/mkql_type_ops.h b/ydb/library/yql/minikql/mkql_type_ops.h index 0dc1270b8a..8ff9b258dd 100644 --- a/ydb/library/yql/minikql/mkql_type_ops.h +++ b/ydb/library/yql/minikql/mkql_type_ops.h @@ -15,10 +15,10 @@ TStringBuf AdaptLegacyYqlType(const TStringBuf& type); bool IsValidValue(NUdf::EDataSlot type, const NUdf::TUnboxedValuePod& value); -bool IsLeapYear(ui32 year); - -ui32 GetMonthLength(ui32 month, bool isLeap); - +bool IsLeapYear(ui32 year); + +ui32 GetMonthLength(ui32 month, bool isLeap); + void UuidHalfsToByteString(ui64 low, ui64 hi, IOutputStream& out); bool IsValidStringValue(NUdf::EDataSlot type, NUdf::TStringRef buf); diff --git a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp index 99b72ac16d..8de3d57fd3 100644 --- a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp +++ b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp @@ -264,11 +264,11 @@ bool TDataProviderBase::GetTasksInfo(NYson::TYsonWriter& writer) { } bool TDataProviderBase::CollectStatistics(NYson::TYsonWriter& writer, bool totalOnly) { - Y_UNUSED(writer); + Y_UNUSED(writer); Y_UNUSED(totalOnly); - return false; -} - + return false; +} + bool TDataProviderBase::CollectDiscoveredData(NYson::TYsonWriter& writer) { Y_UNUSED(writer); return false; diff --git a/ydb/library/yql/sql/v1/node.cpp b/ydb/library/yql/sql/v1/node.cpp index 85c110bbd9..870bbd9cc7 100644 --- a/ydb/library/yql/sql/v1/node.cpp +++ b/ydb/library/yql/sql/v1/node.cpp @@ -1714,17 +1714,17 @@ TNodePtr ISource::BuildPrewindowMap(TContext& ctx) { return Y(ctx.UseUnordered(*this) ? "OrderedFlatMap" : "FlatMap", "core", BuildLambda(Pos, Y("row"), Y("AsList", feed))); } -bool ISource::BuildSamplingLambda(TNodePtr& node) { - if (!SamplingRate) { - return true; - } +bool ISource::BuildSamplingLambda(TNodePtr& node) { + if (!SamplingRate) { + return true; + } auto res = Y("Coalesce", Y("SafeCast", SamplingRate, Y("DataType", Q("Double"))), Y("Double", Q("0"))); - res = Y("/", res, Y("Double", Q("100"))); - res = Y(Y("let", "res", Y("OptionalIf", Y("<", Y("Random", Y("DependsOn", "row")), res), "row"))); - node = BuildLambda(GetPos(), Y("row"), res, "res"); - return !!node; -} - + res = Y("/", res, Y("Double", Q("100"))); + res = Y(Y("let", "res", Y("OptionalIf", Y("<", Y("Random", Y("DependsOn", "row")), res), "row"))); + node = BuildLambda(GetPos(), Y("row"), res, "res"); + return !!node; +} + bool ISource::SetSamplingRate(TContext& ctx, TNodePtr samplingRate) { if (samplingRate) { if (!samplingRate->Init(ctx, this)) { @@ -1732,10 +1732,10 @@ bool ISource::SetSamplingRate(TContext& ctx, TNodePtr samplingRate) { } SamplingRate = Y("Ensure", samplingRate, Y(">=", samplingRate, Y("Double", Q("0"))), Y("String", Q("\"Expected sampling rate to be nonnegative\""))); SamplingRate = Y("Ensure", SamplingRate, Y("<=", SamplingRate, Y("Double", Q("100"))), Y("String", Q("\"Sampling rate is over 100%\""))); - } + } return true; -} - +} + TNodePtr ISource::BuildAggregation(const TString& label) { if (GroupKeys.empty() && Aggregations.empty() && !IsCompositeSource()) { return nullptr; diff --git a/ydb/library/yql/sql/v1/node.h b/ydb/library/yql/sql/v1/node.h index 9e408b214a..de82b45a23 100644 --- a/ydb/library/yql/sql/v1/node.h +++ b/ydb/library/yql/sql/v1/node.h @@ -859,7 +859,7 @@ namespace NSQLTranslationV1 { virtual TNodePtr BuildCalcOverWindow(TContext& ctx, const TString& label); virtual TNodePtr BuildSort(TContext& ctx, const TString& label); virtual TNodePtr BuildCleanupColumns(TContext& ctx, const TString& label); - virtual bool BuildSamplingLambda(TNodePtr& node); + virtual bool BuildSamplingLambda(TNodePtr& node); virtual bool SetSamplingRate(TContext& ctx, TNodePtr samplingRate); virtual IJoin* GetJoin(); virtual ISource* GetCompositeSource(); @@ -917,7 +917,7 @@ namespace NSQLTranslationV1 { bool FlattenColumns = false; THashMap<TString, ui32> GenIndexes; TVector<TString> TmpWindowColumns; - TNodePtr SamplingRate; + TNodePtr SamplingRate; }; template<> diff --git a/ydb/library/yql/sql/v1/select.cpp b/ydb/library/yql/sql/v1/select.cpp index 45a0f73377..2841f05a5b 100644 --- a/ydb/library/yql/sql/v1/select.cpp +++ b/ydb/library/yql/sql/v1/select.cpp @@ -583,12 +583,12 @@ public: return false; } } - TNodePtr sample; - if (!BuildSamplingLambda(sample)) { - return false; - } else if (sample) { - Node = Y("block", Q(Y(Y("let", Node, Y("OrderedFlatMap", Node, sample)), Y("return", Node)))); - } + TNodePtr sample; + if (!BuildSamplingLambda(sample)) { + return false; + } else if (sample) { + Node = Y("block", Q(Y(Y("let", Node, Y("OrderedFlatMap", Node, sample)), Y("return", Node)))); + } return true; } @@ -597,23 +597,23 @@ public: return Node; } - bool SetSamplingOptions( - TContext& ctx, - TPosition pos, - ESampleMode mode, - TNodePtr samplingRate, - TNodePtr samplingSeed) override { - if (mode != ESampleMode::Bernoulli) { - ctx.Error(pos) << "only Bernoulli sampling mode is supported for subqueries"; - return false; - } - if (samplingSeed) { - ctx.Error(pos) << "'Repeatable' keyword is not supported for subqueries"; - return false; - } + bool SetSamplingOptions( + TContext& ctx, + TPosition pos, + ESampleMode mode, + TNodePtr samplingRate, + TNodePtr samplingSeed) override { + if (mode != ESampleMode::Bernoulli) { + ctx.Error(pos) << "only Bernoulli sampling mode is supported for subqueries"; + return false; + } + if (samplingSeed) { + ctx.Error(pos) << "'Repeatable' keyword is not supported for subqueries"; + return false; + } return SetSamplingRate(ctx, samplingRate); - } - + } + bool IsStream() const override { return Subquery->GetSource()->IsStream(); } @@ -2755,13 +2755,13 @@ public: if (SkipTake) { block = L(block, SkipTake); } - - TNodePtr sample; - if (!BuildSamplingLambda(sample)) { - return nullptr; - } else if (sample) { - block = L(block, Y("let", "select", Y("OrderedFlatMap", "select", sample))); - } + + TNodePtr sample; + if (!BuildSamplingLambda(sample)) { + return nullptr; + } else if (sample) { + block = L(block, Y("let", "select", Y("OrderedFlatMap", "select", sample))); + } if (auto removeNode = Source->BuildCleanupColumns(ctx, label)) { block = L(block, removeNode); @@ -2771,23 +2771,23 @@ public: return Y("block", Q(block)); } - bool SetSamplingOptions( - TContext& ctx, - TPosition pos, - ESampleMode mode, - TNodePtr samplingRate, - TNodePtr samplingSeed) override { - if (mode != ESampleMode::Bernoulli) { - ctx.Error(pos) << "only Bernoulli sampling mode is supported for subqueries"; - return false; - } - if (samplingSeed) { - ctx.Error(pos) << "'Repeatable' keyword is not supported for subqueries"; - return false; - } + bool SetSamplingOptions( + TContext& ctx, + TPosition pos, + ESampleMode mode, + TNodePtr samplingRate, + TNodePtr samplingSeed) override { + if (mode != ESampleMode::Bernoulli) { + ctx.Error(pos) << "only Bernoulli sampling mode is supported for subqueries"; + return false; + } + if (samplingSeed) { + ctx.Error(pos) << "'Repeatable' keyword is not supported for subqueries"; + return false; + } return SetSamplingRate(ctx, samplingRate); - } - + } + bool IsSelect() const override { return Source->IsSelect(); } diff --git a/ydb/library/yql/udfs/common/datetime2/datetime_udf.cpp b/ydb/library/yql/udfs/common/datetime2/datetime_udf.cpp index 2897684ee9..763fea8024 100644 --- a/ydb/library/yql/udfs/common/datetime2/datetime_udf.cpp +++ b/ydb/library/yql/udfs/common/datetime2/datetime_udf.cpp @@ -278,14 +278,14 @@ namespace { return false; } - inline bool ValidateDatetime(ui32 datetime) { - return datetime < MAX_DATETIME; - } - - inline bool ValidateTimestamp(ui64 timestamp) { - return timestamp < MAX_TIMESTAMP; - } - + inline bool ValidateDatetime(ui32 datetime) { + return datetime < MAX_DATETIME; + } + + inline bool ValidateTimestamp(ui64 timestamp) { + return timestamp < MAX_TIMESTAMP; + } + inline bool ValidateInterval(i64 interval) { return interval > -i64(MAX_TIMESTAMP) && interval < i64(MAX_TIMESTAMP); } @@ -676,31 +676,31 @@ namespace { // From* - SIMPLE_UDF(TFromSeconds, TOptional<TTimestamp>(TAutoMap<ui32>)) { + SIMPLE_UDF(TFromSeconds, TOptional<TTimestamp>(TAutoMap<ui32>)) { Y_UNUSED(valueBuilder); - auto res = args[0].Get<ui32>(); - if (!ValidateDatetime(res)) { - return TUnboxedValuePod(); - } - return TUnboxedValuePod((ui64)(res * 1000000ull)); + auto res = args[0].Get<ui32>(); + if (!ValidateDatetime(res)) { + return TUnboxedValuePod(); + } + return TUnboxedValuePod((ui64)(res * 1000000ull)); } - SIMPLE_UDF(TFromMilliseconds, TOptional<TTimestamp>(TAutoMap<ui64>)) { + SIMPLE_UDF(TFromMilliseconds, TOptional<TTimestamp>(TAutoMap<ui64>)) { Y_UNUSED(valueBuilder); - auto res = args[0].Get<ui64>(); - if (res >= MAX_TIMESTAMP / 1000u) { - return TUnboxedValuePod(); - } - return TUnboxedValuePod(res * 1000u); + auto res = args[0].Get<ui64>(); + if (res >= MAX_TIMESTAMP / 1000u) { + return TUnboxedValuePod(); + } + return TUnboxedValuePod(res * 1000u); } - SIMPLE_UDF(TFromMicroseconds, TOptional<TTimestamp>(TAutoMap<ui64>)) { + SIMPLE_UDF(TFromMicroseconds, TOptional<TTimestamp>(TAutoMap<ui64>)) { Y_UNUSED(valueBuilder); - auto res = args[0].Get<ui64>(); - if (!ValidateTimestamp(res)) { - return TUnboxedValuePod(); - } - return TUnboxedValuePod(res); + auto res = args[0].Get<ui64>(); + if (!ValidateTimestamp(res)) { + return TUnboxedValuePod(); + } + return TUnboxedValuePod(res); } SIMPLE_UDF(TIntervalFromDays, TOptional<TInterval>(TAutoMap<i32>)) { @@ -942,53 +942,53 @@ namespace { return TUnboxedValuePod((i64)storage.ToTimeOfDay()); } - // Add ... - - SIMPLE_UDF(TShiftYears, TOptional<TResource<TMResourceName>>(TAutoMap<TResource<TMResourceName>>, i32)) { - auto result = args[0]; - auto& storage = TTMStorage::Reference(result); - storage.Year += args[1].Get<i32>(); - if (storage.Month == 2 && storage.Day == 29) { - bool isLeap = NKikimr::NMiniKQL::IsLeapYear(storage.Year); - if (!isLeap) { - storage.Day--; - } - } - auto& builder = valueBuilder->GetDateBuilder(); - if (!storage.Validate(builder)) { - return TUnboxedValuePod(); - } - return result; - } - - TUnboxedValuePod DoAddMonths(const TUnboxedValuePod& date, i64 months, const IDateBuilder& builder) { - auto result = date; - auto& storage = TTMStorage::Reference(result); - i64 newMonth = months + storage.Month; - storage.Year += (newMonth - 1) / 12; - newMonth = 1 + (newMonth - 1) % 12; - if (newMonth <= 0) { - storage.Year--; - newMonth += 12; - } - storage.Month = newMonth; - bool isLeap = NKikimr::NMiniKQL::IsLeapYear(storage.Year); - ui32 monthLength = NKikimr::NMiniKQL::GetMonthLength(storage.Month, isLeap); - storage.Day = std::min(monthLength, storage.Day); - if (!storage.Validate(builder)) { - return TUnboxedValuePod(); - } - return result; - } - - SIMPLE_UDF(TShiftQuarters, TOptional<TResource<TMResourceName>>(TAutoMap<TResource<TMResourceName>>, i32)) { - return DoAddMonths(args[0], 3ll * args[1].Get<i32>(), valueBuilder->GetDateBuilder()); - } - - SIMPLE_UDF(TShiftMonths, TOptional<TResource<TMResourceName>>(TAutoMap<TResource<TMResourceName>>, i32)) { - return DoAddMonths(args[0], args[1].Get<i32>(), valueBuilder->GetDateBuilder()); - } - + // Add ... + + SIMPLE_UDF(TShiftYears, TOptional<TResource<TMResourceName>>(TAutoMap<TResource<TMResourceName>>, i32)) { + auto result = args[0]; + auto& storage = TTMStorage::Reference(result); + storage.Year += args[1].Get<i32>(); + if (storage.Month == 2 && storage.Day == 29) { + bool isLeap = NKikimr::NMiniKQL::IsLeapYear(storage.Year); + if (!isLeap) { + storage.Day--; + } + } + auto& builder = valueBuilder->GetDateBuilder(); + if (!storage.Validate(builder)) { + return TUnboxedValuePod(); + } + return result; + } + + TUnboxedValuePod DoAddMonths(const TUnboxedValuePod& date, i64 months, const IDateBuilder& builder) { + auto result = date; + auto& storage = TTMStorage::Reference(result); + i64 newMonth = months + storage.Month; + storage.Year += (newMonth - 1) / 12; + newMonth = 1 + (newMonth - 1) % 12; + if (newMonth <= 0) { + storage.Year--; + newMonth += 12; + } + storage.Month = newMonth; + bool isLeap = NKikimr::NMiniKQL::IsLeapYear(storage.Year); + ui32 monthLength = NKikimr::NMiniKQL::GetMonthLength(storage.Month, isLeap); + storage.Day = std::min(monthLength, storage.Day); + if (!storage.Validate(builder)) { + return TUnboxedValuePod(); + } + return result; + } + + SIMPLE_UDF(TShiftQuarters, TOptional<TResource<TMResourceName>>(TAutoMap<TResource<TMResourceName>>, i32)) { + return DoAddMonths(args[0], 3ll * args[1].Get<i32>(), valueBuilder->GetDateBuilder()); + } + + SIMPLE_UDF(TShiftMonths, TOptional<TResource<TMResourceName>>(TAutoMap<TResource<TMResourceName>>, i32)) { + return DoAddMonths(args[0], args[1].Get<i32>(), valueBuilder->GetDateBuilder()); + } + template<size_t Digits, bool Exacly = true> struct PrintNDigits; @@ -1697,10 +1697,10 @@ namespace { TStartOf, TTimeOfDay, - TShiftYears, - TShiftQuarters, - TShiftMonths, - + TShiftYears, + TShiftQuarters, + TShiftMonths, + TUserDataTypeFuncFactory<true, ToSecondsName, TToSeconds, TDate, TDatetime, diff --git a/ydb/library/yql/udfs/common/hyperloglog/hyperloglog_udf.cpp b/ydb/library/yql/udfs/common/hyperloglog/hyperloglog_udf.cpp index d14fc617c5..b4b52639dc 100644 --- a/ydb/library/yql/udfs/common/hyperloglog/hyperloglog_udf.cpp +++ b/ydb/library/yql/udfs/common/hyperloglog/hyperloglog_udf.cpp @@ -2,133 +2,133 @@ #include <library/cpp/hyperloglog/hyperloglog.h> -#include <util/generic/hash_set.h> - -#include <variant> - +#include <util/generic/hash_set.h> + +#include <variant> + using namespace NKikimr; using namespace NUdf; namespace { - class THybridHyperLogLog { - private: - using THybridSet = THashSet<ui64, std::hash<ui64>, std::equal_to<ui64>, TStdAllocatorForUdf<ui64>>; - using THybridHll = THyperLogLogWithAlloc<TStdAllocatorForUdf<ui8>>; - - explicit THybridHyperLogLog(unsigned precision) - : Var(THybridSet()), SizeLimit((1u << precision) / 8), Precision(precision) - { } - - THybridHll ConvertToHyperLogLog() const { - auto res = THybridHll::Create(Precision); - for (auto& el : GetSetRef()) { - res.Update(el); - } - return res; - } - - bool IsSet() const { - return Var.index() == 1; - } - - const THybridSet& GetSetRef() const { - return std::get<1>(Var); - } - - THybridSet& GetMutableSetRef() { - return std::get<1>(Var); - } - - const THybridHll& GetHllRef() const { - return std::get<0>(Var); - } - - THybridHll& GetMutableHllRef() { - return std::get<0>(Var); - } - - public: - THybridHyperLogLog (THybridHyperLogLog&&) = default; - - THybridHyperLogLog& operator=(THybridHyperLogLog&&) = default; - - void Update(ui64 hash) { - if (IsSet()) { - GetMutableSetRef().insert(hash); - if (GetSetRef().size() >= SizeLimit) { - Var = ConvertToHyperLogLog(); - } - } else { - GetMutableHllRef().Update(hash); - } - } - - void Merge(const THybridHyperLogLog& rh) { - if (IsSet() && rh.IsSet()) { - GetMutableSetRef().insert(rh.GetSetRef().begin(), rh.GetSetRef().end()); - if (GetSetRef().size() >= SizeLimit) { - Var = ConvertToHyperLogLog(); - } - } else { - if (IsSet()) { - Var = ConvertToHyperLogLog(); - } - if (rh.IsSet()) { - GetMutableHllRef().Merge(rh.ConvertToHyperLogLog()); - } else { - GetMutableHllRef().Merge(rh.GetHllRef()); - } - } - } - - void Save(IOutputStream& out) const { - out.Write(static_cast<char>(Var.index())); - out.Write(static_cast<char>(Precision)); - if (IsSet()) { - ::Save(&out, GetSetRef()); - } else { - GetHllRef().Save(out); - } - } - - ui64 Estimate() const { - if (IsSet()) { - return GetSetRef().size(); - } - return GetHllRef().Estimate(); - } - - static THybridHyperLogLog Create(unsigned precision) { - Y_ENSURE(precision >= THyperLogLog::PRECISION_MIN && precision <= THyperLogLog::PRECISION_MAX); - return THybridHyperLogLog(precision); - } - - static THybridHyperLogLog Load(IInputStream& in) { - char type; - Y_ENSURE(in.ReadChar(type)); - char precision; - Y_ENSURE(in.ReadChar(precision)); - auto res = Create(precision); - if (type) { - ::Load(&in, res.GetMutableSetRef()); - } else { - res.Var = THybridHll::Load(in); - } - return res; - } - - private: - std::variant<THybridHll, THybridSet> Var; - - size_t SizeLimit; - - unsigned Precision; - }; - + class THybridHyperLogLog { + private: + using THybridSet = THashSet<ui64, std::hash<ui64>, std::equal_to<ui64>, TStdAllocatorForUdf<ui64>>; + using THybridHll = THyperLogLogWithAlloc<TStdAllocatorForUdf<ui8>>; + + explicit THybridHyperLogLog(unsigned precision) + : Var(THybridSet()), SizeLimit((1u << precision) / 8), Precision(precision) + { } + + THybridHll ConvertToHyperLogLog() const { + auto res = THybridHll::Create(Precision); + for (auto& el : GetSetRef()) { + res.Update(el); + } + return res; + } + + bool IsSet() const { + return Var.index() == 1; + } + + const THybridSet& GetSetRef() const { + return std::get<1>(Var); + } + + THybridSet& GetMutableSetRef() { + return std::get<1>(Var); + } + + const THybridHll& GetHllRef() const { + return std::get<0>(Var); + } + + THybridHll& GetMutableHllRef() { + return std::get<0>(Var); + } + + public: + THybridHyperLogLog (THybridHyperLogLog&&) = default; + + THybridHyperLogLog& operator=(THybridHyperLogLog&&) = default; + + void Update(ui64 hash) { + if (IsSet()) { + GetMutableSetRef().insert(hash); + if (GetSetRef().size() >= SizeLimit) { + Var = ConvertToHyperLogLog(); + } + } else { + GetMutableHllRef().Update(hash); + } + } + + void Merge(const THybridHyperLogLog& rh) { + if (IsSet() && rh.IsSet()) { + GetMutableSetRef().insert(rh.GetSetRef().begin(), rh.GetSetRef().end()); + if (GetSetRef().size() >= SizeLimit) { + Var = ConvertToHyperLogLog(); + } + } else { + if (IsSet()) { + Var = ConvertToHyperLogLog(); + } + if (rh.IsSet()) { + GetMutableHllRef().Merge(rh.ConvertToHyperLogLog()); + } else { + GetMutableHllRef().Merge(rh.GetHllRef()); + } + } + } + + void Save(IOutputStream& out) const { + out.Write(static_cast<char>(Var.index())); + out.Write(static_cast<char>(Precision)); + if (IsSet()) { + ::Save(&out, GetSetRef()); + } else { + GetHllRef().Save(out); + } + } + + ui64 Estimate() const { + if (IsSet()) { + return GetSetRef().size(); + } + return GetHllRef().Estimate(); + } + + static THybridHyperLogLog Create(unsigned precision) { + Y_ENSURE(precision >= THyperLogLog::PRECISION_MIN && precision <= THyperLogLog::PRECISION_MAX); + return THybridHyperLogLog(precision); + } + + static THybridHyperLogLog Load(IInputStream& in) { + char type; + Y_ENSURE(in.ReadChar(type)); + char precision; + Y_ENSURE(in.ReadChar(precision)); + auto res = Create(precision); + if (type) { + ::Load(&in, res.GetMutableSetRef()); + } else { + res.Var = THybridHll::Load(in); + } + return res; + } + + private: + std::variant<THybridHll, THybridSet> Var; + + size_t SizeLimit; + + unsigned Precision; + }; + extern const char HyperLogLogResourceName[] = "HyperLogLog.State"; - using THyperLogLogResource = TBoxedResource<THybridHyperLogLog, HyperLogLogResourceName>; - + using THyperLogLogResource = TBoxedResource<THybridHyperLogLog, HyperLogLogResourceName>; + class THyperLogLog_Create: public TBoxedValue { public: THyperLogLog_Create(TSourcePosition pos) @@ -145,7 +145,7 @@ namespace { const IValueBuilder*, const TUnboxedValuePod* args) const override { try { - THolder<THyperLogLogResource> hll(new THyperLogLogResource(THybridHyperLogLog::Create(args[1].Get<ui32>()))); + THolder<THyperLogLogResource> hll(new THyperLogLogResource(THybridHyperLogLog::Create(args[1].Get<ui32>()))); hll->Get()->Update(args[0].Get<ui64>()); return TUnboxedValuePod(hll.Release()); } catch (const std::exception& e) { @@ -288,7 +288,7 @@ namespace { Y_UNUSED(valueBuilder); const TString arg(args[0].AsStringRef()); TStringInput input(arg); - THolder<THyperLogLogResource> hll(new THyperLogLogResource(THybridHyperLogLog::Load(input))); + THolder<THyperLogLogResource> hll(new THyperLogLogResource(THybridHyperLogLog::Load(input))); return TUnboxedValuePod(hll.Release()); } catch (const std::exception& e) { UdfTerminate((TStringBuilder() << Pos_ << " " << e.what()).data()); |