aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Common/HyperLogLogWithSmallSetOptimization.h
blob: 1d2408186dea316ad4a255766ca4f1e8636f6040 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#pragma once

#include <boost/noncopyable.hpp>

#include <Common/HyperLogLogCounter.h>
#include <Common/HashTable/SmallTable.h>


namespace DB
{


/** For a small number of keys - an array of fixed size "on the stack".
  * For large, HyperLogLog is allocated.
  * See also the more practical implementation in CombinedCardinalityEstimator.h,
  *  where a hash table is also used for medium-sized sets.
  */
template
<
    typename Key,
    UInt8 small_set_size,
    UInt8 K,
    typename Hash = IntHash32<Key>,
    typename DenominatorType = double>
class HyperLogLogWithSmallSetOptimization : private boost::noncopyable
{
private:
    using Small = SmallSet<Key, small_set_size>;
    using Large = HyperLogLogCounter<K, Key, Hash, UInt32, DenominatorType>;
    using LargeValueType = typename Large::value_type;

    Small small;
    Large * large = nullptr;

    bool isLarge() const
    {
        return large != nullptr;
    }

    void toLarge()
    {
        /// At the time of copying data from `tiny`, setting the value of `large` is still not possible (otherwise it will overwrite some data).
        Large * tmp_large = new Large;

        for (const auto & x : small)
            tmp_large->insert(static_cast<LargeValueType>(x.getValue()));

        large = tmp_large;
    }

public:
    using value_type = Key;

    ~HyperLogLogWithSmallSetOptimization()
    {
        if (isLarge())
            delete large;
    }

    /// ALWAYS_INLINE is required to have better code layout for uniqHLL12 function
    void ALWAYS_INLINE insert(Key value)
    {
        if (!isLarge())
        {
            if (small.find(value) == small.end())
            {
                if (!small.full())
                    small.insert(value);
                else
                {
                    toLarge();
                    large->insert(static_cast<LargeValueType>(value));
                }
            }
        }
        else
            large->insert(static_cast<LargeValueType>(value));
    }

    UInt64 size() const
    {
        return !isLarge() ? small.size() : large->size();
    }

    void merge(const HyperLogLogWithSmallSetOptimization & rhs)
    {
        if (rhs.isLarge())
        {
            if (!isLarge())
                toLarge();

            large->merge(*rhs.large);
        }
        else
        {
            for (const auto & x : rhs.small)
                insert(x.getValue());
        }
    }

    /// You can only call for an empty object.
    void read(DB::ReadBuffer & in)
    {
        bool is_large;
        readBinary(is_large, in);

        if (is_large)
        {
            toLarge();
            large->read(in);
        }
        else
            small.read(in);
    }

    void readAndMerge(DB::ReadBuffer & in)
    {
        bool is_rhs_large;
        readBinary(is_rhs_large, in);

        if (!isLarge() && is_rhs_large)
            toLarge();

        if (!is_rhs_large)
        {
            typename Small::Reader reader(in);
            while (reader.next())
                insert(reader.get());
        }
        else
            large->readAndMerge(in);
    }

    void write(DB::WriteBuffer & out) const
    {
        writeBinary(isLarge(), out);

        if (isLarge())
            large->write(out);
        else
            small.write(out);
    }
};


}