diff options
author | orlovorlov <[email protected]> | 2024-08-17 04:00:08 +0300 |
---|---|---|
committer | orlovorlov <[email protected]> | 2024-08-17 04:10:14 +0300 |
commit | 08c062b6cc9a8e38eba558b4a939457558d8a9ae (patch) | |
tree | 3f68e189d123b470d83db3ea46d97e79bc378925 | |
parent | 5644ad411b1c871b97bf329c99632736dc64ef2e (diff) |
YT-22157: Add HLL(4) to chunk meta
- Use lower precision to reduce size effect on meta
- Put HLL into a separate meta extension TLargeColumnarStatisticsExt
- Compress extension data with Zlib
- Clear new extension from TVersionedChunkMeta
Logic that produces new extension is protected by the following flags:
- TChunkWriterOptions::EnableLargeStatistics option
- TableWriterEnableLargeStatistics in TOperationSpecBase
- TableWriterEnableLargeStatistics in native connection config
the flag fill be rolled as a spec template, and as an rpc proxy dynconfig change.
4ca52347756971a47355f95ba17b05675e961c7e
-rw-r--r-- | yt/yt/client/api/rpc_proxy/helpers.cpp | 4 | ||||
-rw-r--r-- | yt/yt/client/driver/table_commands.cpp | 9 | ||||
-rw-r--r-- | yt/yt/client/table_client/columnar_statistics.cpp | 84 | ||||
-rw-r--r-- | yt/yt/client/table_client/columnar_statistics.h | 33 | ||||
-rw-r--r-- | yt/yt/client/table_client/config.cpp | 3 | ||||
-rw-r--r-- | yt/yt/client/table_client/config.h | 2 | ||||
-rw-r--r-- | yt/yt/client/table_client/unittests/columnar_statistics_ut.cpp | 381 | ||||
-rw-r--r-- | yt/yt/client/table_client/unittests/helpers/helpers.cpp | 12 | ||||
-rw-r--r-- | yt/yt/core/misc/hyperloglog.h | 78 | ||||
-rw-r--r-- | yt/yt/core/misc/unittests/hyperloglog_ut.cpp | 4 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto | 6 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.proto | 7 | ||||
-rw-r--r-- | yt/yt_proto/yt/core/misc/proto/hyperloglog.proto | 15 | ||||
-rw-r--r-- | yt/yt_proto/yt/core/ya.make | 1 |
14 files changed, 629 insertions, 10 deletions
diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp index 418820bdce4..ec9587502da 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.cpp +++ b/yt/yt/client/api/rpc_proxy/helpers.cpp @@ -1233,6 +1233,8 @@ void ToProto( if (statistics.LegacyChunkRowCount) { protoStatistics->set_legacy_chunk_row_count(*statistics.LegacyChunkRowCount); } + + ToProto(protoStatistics->mutable_column_hyperloglog_digests(), statistics.LargeStatistics.ColumnHyperLogLogDigests); } void FromProto( @@ -1261,6 +1263,8 @@ void FromProto( } else { statistics->LegacyChunkRowCount.reset(); } + + FromProto(&statistics->LargeStatistics.ColumnHyperLogLogDigests, protoStatistics.column_hyperloglog_digests()); } void ToProto( diff --git a/yt/yt/client/driver/table_commands.cpp b/yt/yt/client/driver/table_commands.cpp index d454ef00e7b..a45768580b7 100644 --- a/yt/yt/client/driver/table_commands.cpp +++ b/yt/yt/client/driver/table_commands.cpp @@ -404,6 +404,15 @@ void TGetTableColumnarStatisticsCommand::DoExecute(ICommandContextPtr context) } }); }) + .DoIf(statistics.HasLargeStatistics(), [&](TFluentMap fluent) { + fluent + .Item("column_estimated_unique_counts").DoMap([&](TFluentMap fluent) { + const auto& largeStat = statistics.LargeStatistics; + for (int index = 0; index < std::ssize(largeStat.ColumnHyperLogLogDigests); ++index) { + fluent.Item(columns[index]).Value(largeStat.ColumnHyperLogLogDigests[index].EstimateCardinality()); + } + }); + }) .OptionalItem("chunk_row_count", statistics.ChunkRowCount) .OptionalItem("legacy_chunk_row_count", statistics.LegacyChunkRowCount) .EndMap(); diff --git a/yt/yt/client/table_client/columnar_statistics.cpp b/yt/yt/client/table_client/columnar_statistics.cpp index c894a759154..a826e690ab3 100644 --- a/yt/yt/client/table_client/columnar_statistics.cpp +++ b/yt/yt/client/table_client/columnar_statistics.cpp @@ -72,6 +72,21 @@ TUnversionedOwningValue ApproximateMaxValue(TUnversionedValue value) } template <typename TRow> +void UpdateLargeColumnarStatistics(TLargeColumnarStatistics& statistics, TRange<TRow> rows) +{ + for (const auto& values : rows) { + for (const auto& value : values) { + if (value.Type != EValueType::Null) { + auto valueNoFlags = value; + valueNoFlags.Flags = EValueFlags::None; + auto fingerprint = TBitwiseUnversionedValueHash()(valueNoFlags); + statistics.ColumnHyperLogLogDigests[value.Id].Add(fingerprint); + } + } + } +} + +template <typename TRow> void UpdateColumnarStatistics(TColumnarStatistics& statistics, TRange<TRow> rows) { int maxId = -1; @@ -94,6 +109,10 @@ void UpdateColumnarStatistics(TColumnarStatistics& statistics, TRange<TRow> rows return; } + if (statistics.HasLargeStatistics()) { + UpdateLargeColumnarStatistics(statistics.LargeStatistics, rows); + } + // Vectors for precalculation of minimum and maximum values from rows that we are adding. std::vector<TUnversionedValue> minValues(maxId + 1, NullUnversionedValue); std::vector<TUnversionedValue> maxValues(maxId + 1, NullUnversionedValue); @@ -139,10 +158,37 @@ void UpdateColumnarStatistics(TColumnarStatistics& statistics, TRange<TRow> rows } // namespace +/////////////////////////////////////////////////////////////////////////////// + +bool TLargeColumnarStatistics::Empty() const +{ + return ColumnHyperLogLogDigests.empty(); +} + +void TLargeColumnarStatistics::Clear() +{ + ColumnHyperLogLogDigests.clear(); +} + +void TLargeColumnarStatistics::Resize(int columnCount) +{ + ColumnHyperLogLogDigests.resize(columnCount); +} + +TLargeColumnarStatistics& TLargeColumnarStatistics::operator+=(const TLargeColumnarStatistics& other) +{ + for (int index = 0; index < std::ssize(ColumnHyperLogLogDigests); ++index) { + ColumnHyperLogLogDigests[index].Merge(other.ColumnHyperLogLogDigests[index]); + } + return *this; +} + +/////////////////////////////////////////////////////////////////////////////// + TColumnarStatistics& TColumnarStatistics::operator+=(const TColumnarStatistics& other) { if (GetColumnCount() == 0) { - Resize(other.GetColumnCount(), other.HasValueStatistics()); + Resize(other.GetColumnCount(), other.HasValueStatistics(), other.HasLargeStatistics()); } YT_VERIFY(GetColumnCount() == other.GetColumnCount()); @@ -169,6 +215,7 @@ TColumnarStatistics& TColumnarStatistics::operator+=(const TColumnarStatistics& if (!other.HasValueStatistics()) { ClearValueStatistics(); } else if (HasValueStatistics()) { + bool mergeLargeStatistics = HasLargeStatistics() && other.HasLargeStatistics(); for (int index = 0; index < GetColumnCount(); ++index) { if (other.ColumnMinValues[index] != NullUnversionedValue && @@ -185,14 +232,20 @@ TColumnarStatistics& TColumnarStatistics::operator+=(const TColumnarStatistics& ColumnNonNullValueCounts[index] += other.ColumnNonNullValueCounts[index]; } + if (mergeLargeStatistics) { + LargeStatistics += other.LargeStatistics; + } else { + LargeStatistics.Clear(); + } } + return *this; } -TColumnarStatistics TColumnarStatistics::MakeEmpty(int columnCount, bool hasValueStatistics) +TColumnarStatistics TColumnarStatistics::MakeEmpty(int columnCount, bool hasValueStatistics, bool hasLargeStatistics) { TColumnarStatistics result; - result.Resize(columnCount, hasValueStatistics); + result.Resize(columnCount, hasValueStatistics, hasLargeStatistics); return result; } @@ -234,11 +287,17 @@ bool TColumnarStatistics::HasValueStatistics() const return GetColumnCount() == 0 || !ColumnMinValues.empty(); } +bool TColumnarStatistics::HasLargeStatistics() const +{ + return GetColumnCount() == 0 || !LargeStatistics.Empty(); +} + void TColumnarStatistics::ClearValueStatistics() { ColumnMinValues.clear(); ColumnMaxValues.clear(); ColumnNonNullValueCounts.clear(); + LargeStatistics.Clear(); } int TColumnarStatistics::GetColumnCount() const @@ -246,9 +305,14 @@ int TColumnarStatistics::GetColumnCount() const return ColumnDataWeights.size(); } -void TColumnarStatistics::Resize(int columnCount, bool keepValueStatistics) +void TColumnarStatistics::Resize(int columnCount, bool keepValueStatistics, bool keepLargeStatistics) { + if (columnCount < GetColumnCount()) { + // Downsizes are not allowed. If reducing column count, must clear the stats completely. + YT_VERIFY(columnCount == 0); + } keepValueStatistics &= HasValueStatistics(); + keepLargeStatistics &= (keepValueStatistics && HasLargeStatistics()); ColumnDataWeights.resize(columnCount, 0); @@ -256,6 +320,12 @@ void TColumnarStatistics::Resize(int columnCount, bool keepValueStatistics) ColumnMinValues.resize(columnCount, NullUnversionedValue); ColumnMaxValues.resize(columnCount, NullUnversionedValue); ColumnNonNullValueCounts.resize(columnCount, 0); + + if (keepLargeStatistics) { + LargeStatistics.Resize(columnCount); + } else { + LargeStatistics.Clear(); + } } else { ClearValueStatistics(); } @@ -298,7 +368,7 @@ void TColumnarStatistics::Update(TRange<TVersionedRow> rows) TColumnarStatistics TColumnarStatistics::SelectByColumnNames(const TNameTablePtr& nameTable, const std::vector<TColumnStableName>& columnStableNames) const { - auto result = MakeEmpty(columnStableNames.size(), HasValueStatistics()); + auto result = MakeEmpty(columnStableNames.size(), HasValueStatistics(), HasLargeStatistics()); for (const auto& [columnIndex, columnName] : Enumerate(columnStableNames)) { if (auto id = nameTable->FindId(columnName.Underlying()); id && *id < GetColumnCount()) { @@ -308,6 +378,10 @@ TColumnarStatistics TColumnarStatistics::SelectByColumnNames(const TNameTablePtr result.ColumnMinValues[columnIndex] = ColumnMinValues[*id]; result.ColumnMaxValues[columnIndex] = ColumnMaxValues[*id]; result.ColumnNonNullValueCounts[columnIndex] = ColumnNonNullValueCounts[*id]; + + if (HasLargeStatistics()) { + result.LargeStatistics.ColumnHyperLogLogDigests[columnIndex] = LargeStatistics.ColumnHyperLogLogDigests[*id]; + } } } } diff --git a/yt/yt/client/table_client/columnar_statistics.h b/yt/yt/client/table_client/columnar_statistics.h index a4e3514f76f..82dbec47c0f 100644 --- a/yt/yt/client/table_client/columnar_statistics.h +++ b/yt/yt/client/table_client/columnar_statistics.h @@ -2,6 +2,8 @@ #include "public.h" +#include <yt/yt/core/misc/hyperloglog.h> + namespace NYT::NTableClient { //////////////////////////////////////////////////////////////////////////////// @@ -28,6 +30,25 @@ struct TNamedColumnarStatistics TNamedColumnarStatistics& operator +=(const TNamedColumnarStatistics& other); }; +typedef THyperLogLog< + // Precision = 4, or 16 registers + 4 +> TColumnarHyperLogLogDigest; + +//! This struct includes large per-column statistics too big to fit together with basic stats +struct TLargeColumnarStatistics +{ + //! Per-column HyperLogLog digest to approximate number of unique values in the column. + std::vector<TColumnarHyperLogLogDigest> ColumnHyperLogLogDigests; + + bool Empty() const; + void Clear(); + void Resize(int columnCount); + + bool operator==(const TLargeColumnarStatistics& other) const = default; + TLargeColumnarStatistics& operator+=(const TLargeColumnarStatistics& other); +}; + //! TColumnarStatistics stores per-column statistics of data stored in a chunk/table. struct TColumnarStatistics { @@ -65,10 +86,13 @@ struct TColumnarStatistics //! Can be missing only if the cluster version is 23.1 or older. std::optional<i64> LegacyChunkRowCount = 0; + //! Large per-column statistics, including columnar HLL. + TLargeColumnarStatistics LargeStatistics; + TColumnarStatistics& operator+=(const TColumnarStatistics& other); bool operator==(const TColumnarStatistics& other) const = default; - static TColumnarStatistics MakeEmpty(int columnCount, bool hasValueStatistics = true); + static TColumnarStatistics MakeEmpty(int columnCount, bool hasValueStatistics = true, bool hasLargeStatistics = true); static TColumnarStatistics MakeLegacy(int columnCount, i64 legacyChunkDataWeight, i64 legacyChunkRowCount); TLightweightColumnarStatistics MakeLightweightStatistics() const; @@ -77,12 +101,17 @@ struct TColumnarStatistics //! Checks if there are minimum, maximum, and non-null value statistics. bool HasValueStatistics() const; + bool HasLargeStatistics() const; + //! Clears minimum, maximum, and non-null value statistics. void ClearValueStatistics(); int GetColumnCount() const; - void Resize(int columnCount, bool keepValueStatistics = true); + //! Changes column count. + //! Existing value statistics are kept or erased depending on now keepValueStatistics/keepHyperLogLogDigests flags are set. + //! keepHyperLogLogDigests only has effect when keepValueStatistics set to true. + void Resize(int columnCount, bool keepValueStatistics = true, bool keepLargeStatistics = true); void Update(TRange<TUnversionedRow> rows); void Update(TRange<TVersionedRow> rows); diff --git a/yt/yt/client/table_client/config.cpp b/yt/yt/client/table_client/config.cpp index e704705cf2c..e07b03d6f8b 100644 --- a/yt/yt/client/table_client/config.cpp +++ b/yt/yt/client/table_client/config.cpp @@ -169,6 +169,9 @@ void TChunkWriterConfig::Register(TRegistrar registrar) .DefaultNew(); registrar.Parameter("key_prefix_filter", &TThis::KeyPrefixFilter) .DefaultNew(); + + registrar.Parameter("enable_large_columnar_statistics", &TThis::EnableLargeColumnarStatistics) + .Default(false); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/table_client/config.h b/yt/yt/client/table_client/config.h index 95f5900851c..e9cebc18c16 100644 --- a/yt/yt/client/table_client/config.h +++ b/yt/yt/client/table_client/config.h @@ -154,6 +154,8 @@ public: double SampleRate; + bool EnableLargeColumnarStatistics; + TChunkIndexesWriterConfigPtr ChunkIndexes; TSlimVersionedWriterConfigPtr Slim; diff --git a/yt/yt/client/table_client/unittests/columnar_statistics_ut.cpp b/yt/yt/client/table_client/unittests/columnar_statistics_ut.cpp index 47054dff7b0..ef7fd233514 100644 --- a/yt/yt/client/table_client/unittests/columnar_statistics_ut.cpp +++ b/yt/yt/client/table_client/unittests/columnar_statistics_ut.cpp @@ -22,6 +22,46 @@ std::vector<TUnversionedRow> CaptureRows(TRowBufferPtr rowBuffer, const std::vec return result; } +void UpdateValueHll(TUnversionedValue value, std::vector<TColumnarHyperLogLogDigest>* hll) +{ + if (value.Type == EValueType::Null) { + return; + } + auto fingerprint = TBitwiseUnversionedValueHash()(value); + hll->at(value.Id).Add(fingerprint); +} + +std::vector<TColumnarHyperLogLogDigest> ComputeHll(const std::vector<TUnversionedRow>& rows) { + std::vector<TColumnarHyperLogLogDigest> result; + if (rows.empty()) { + return result; + } + result.resize(rows[0].GetCount(), TColumnarHyperLogLogDigest()); + for (const auto& row : rows) { + for (auto value : row) { + UpdateValueHll(value, &result); + } + } + return result; +} + +std::vector<TColumnarHyperLogLogDigest> ComputeHll(const std::vector<TVersionedRow>& rows) { + std::vector<TColumnarHyperLogLogDigest> result; + if (rows.empty()) { + return result; + } + result.resize(rows[0].GetKeyCount() + rows[0].GetValueCount(), TColumnarHyperLogLogDigest()); + for (const auto& row : rows) { + for (auto value : row.Keys()) { + UpdateValueHll(value, &result); + } + for (auto value : row.Values()) { + UpdateValueHll(value, &result); + } + } + return result; +} + //////////////////////////////////////////////////////////////////////////////// TEST(TUpdateColumnarStatisticsTest, EmptyStruct) @@ -63,6 +103,7 @@ TEST(TUpdateColumnarStatisticsTest, EmptyStruct) .ColumnNonNullValueCounts = {2, 3, 3}, .ChunkRowCount = 3, .LegacyChunkRowCount = 0, + .LargeStatistics = TLargeColumnarStatistics{.ColumnHyperLogLogDigests = ComputeHll(rows)} }; EXPECT_EQ(statistics, expected); } @@ -86,6 +127,10 @@ TEST(TUpdateColumnarStatisticsTest, InitializedStruct) .ColumnNonNullValueCounts = {5, 10, 50, 3}, .ChunkRowCount = 50, .LegacyChunkRowCount = 3, + .LargeStatistics = TLargeColumnarStatistics{ + .ColumnHyperLogLogDigests = + std::vector<TColumnarHyperLogLogDigest>(4, TColumnarHyperLogLogDigest()) + } }; auto rowBuffer = New<TRowBuffer>(); @@ -123,6 +168,7 @@ TEST(TUpdateColumnarStatisticsTest, InitializedStruct) .ColumnNonNullValueCounts = {7, 12, 52, 5}, .ChunkRowCount = 52, .LegacyChunkRowCount = 3, + .LargeStatistics = TLargeColumnarStatistics{.ColumnHyperLogLogDigests = ComputeHll(rows)} }; EXPECT_EQ(statistics, expected); } @@ -156,6 +202,7 @@ TEST(TUpdateColumnarStatisticsTest, DefaultStruct) .ColumnNonNullValueCounts = {1, 1, 1}, .ChunkRowCount = 1, .LegacyChunkRowCount = 0, + .LargeStatistics = TLargeColumnarStatistics{.ColumnHyperLogLogDigests = ComputeHll(rows)} }; EXPECT_EQ(statistics, expected); } @@ -189,10 +236,279 @@ TEST(TUpdateColumnarStatisticsTest, StructSizeLessThanRowSize) .ColumnNonNullValueCounts = {1, 1, 1}, .ChunkRowCount = 1, .LegacyChunkRowCount = 0, + .LargeStatistics = TLargeColumnarStatistics{.ColumnHyperLogLogDigests = ComputeHll(rows)} }; EXPECT_EQ(statistics, expected); } +TEST(TUpdateColumnarStatisticsTest, ResizeNoHll) +{ + TColumnarStatistics original{ + .ColumnDataWeights = {8, 4, 8}, + .ColumnMinValues = { + MakeUnversionedInt64Value(12), + MakeUnversionedStringValue("buzz"), + MakeUnversionedDoubleValue(1e70), + }, + .ColumnMaxValues = { + MakeUnversionedInt64Value(12), + MakeUnversionedStringValue("buzz"), + MakeUnversionedDoubleValue(1e70), + }, + .ColumnNonNullValueCounts = {1, 1, 1}, + .ChunkRowCount = 1, + .LegacyChunkRowCount = 0, + }; + + auto statistics = original; + statistics.Resize(4, true /* keep value statistics and hll */); + + TColumnarStatistics expected{ + .ColumnDataWeights = {8, 4, 8, 0}, + .ColumnMinValues = { + MakeUnversionedInt64Value(12), + MakeUnversionedStringValue("buzz"), + MakeUnversionedDoubleValue(1e70), + MakeUnversionedNullValue() + }, + .ColumnMaxValues = { + MakeUnversionedInt64Value(12), + MakeUnversionedStringValue("buzz"), + MakeUnversionedDoubleValue(1e70), + MakeUnversionedNullValue() + }, + .ColumnNonNullValueCounts = {1, 1, 1, 0}, + .ChunkRowCount = 1, + .LegacyChunkRowCount = 0, + }; + + EXPECT_TRUE(statistics.HasValueStatistics()); + EXPECT_FALSE(statistics.HasLargeStatistics()); + EXPECT_TRUE(statistics.LargeStatistics.ColumnHyperLogLogDigests.empty()); + EXPECT_EQ(statistics, expected); + + // Clears out min/max/not null counts. + statistics.Resize(4, false /* keep value statistics */); + + TColumnarStatistics expectedClear{ + .ColumnDataWeights = {8, 4, 8, 0}, + .ChunkRowCount = 1, + .LegacyChunkRowCount = 0, + }; + EXPECT_EQ(statistics, expectedClear); +} + +TEST(TUpdateColumnarStatisticsTest, ResizeHll) +{ + auto rowBuffer = New<TRowBuffer>(); + auto rows = CaptureRows(rowBuffer, { + { + MakeUnversionedInt64Value(12, 0), + MakeUnversionedStringValue("buzz", 1), + MakeUnversionedDoubleValue(1e70, 2), + }, + }); + + auto statistics = TColumnarStatistics::MakeEmpty(1); + statistics.Update(rows); + + auto expectedHll = ComputeHll(rows); + expectedHll.push_back(TColumnarHyperLogLogDigest()); + + TColumnarStatistics expected{ + .ColumnDataWeights = {8, 4, 8, 0}, + .ColumnMinValues = { + MakeUnversionedInt64Value(12), + MakeUnversionedStringValue("buzz"), + MakeUnversionedDoubleValue(1e70), + MakeUnversionedNullValue() + }, + .ColumnMaxValues = { + MakeUnversionedInt64Value(12), + MakeUnversionedStringValue("buzz"), + MakeUnversionedDoubleValue(1e70), + MakeUnversionedNullValue() + }, + .ColumnNonNullValueCounts = {1, 1, 1, 0}, + .ChunkRowCount = 1, + .LegacyChunkRowCount = 0, + .LargeStatistics = TLargeColumnarStatistics{.ColumnHyperLogLogDigests = expectedHll} + }; + + statistics.Resize(4, true /* keep value statistics and hll */); + EXPECT_EQ(statistics, expected); + + statistics.Resize(4, true /* keep value statistics */, false /* keep hll */); + TColumnarStatistics expectedNoHll{ + .ColumnDataWeights = {8, 4, 8, 0}, + .ColumnMinValues = { + MakeUnversionedInt64Value(12), + MakeUnversionedStringValue("buzz"), + MakeUnversionedDoubleValue(1e70), + MakeUnversionedNullValue() + }, + .ColumnMaxValues = { + MakeUnversionedInt64Value(12), + MakeUnversionedStringValue("buzz"), + MakeUnversionedDoubleValue(1e70), + MakeUnversionedNullValue() + }, + .ColumnNonNullValueCounts = {1, 1, 1, 0}, + .ChunkRowCount = 1, + .LegacyChunkRowCount = 0, + .LargeStatistics = TLargeColumnarStatistics{.ColumnHyperLogLogDigests = {}} + }; + EXPECT_EQ(statistics, expectedNoHll); + + statistics.Resize(4, false /* keep value statistics/hll */); + + TColumnarStatistics expectedClear{ + .ColumnDataWeights = {8, 4, 8, 0}, + .ChunkRowCount = 1, + .LegacyChunkRowCount = 0, + }; + EXPECT_EQ(statistics, expectedClear); +} + +TEST(TUpdateColumnarStatisticsTest, CombineHllAndNoHll) +{ + auto rowBuffer = New<TRowBuffer>(); + auto rows1 = CaptureRows(rowBuffer, { + { + MakeUnversionedInt64Value(12, 0), + MakeUnversionedStringValue("buzz", 1), + MakeUnversionedDoubleValue(1e70, 2), + }, + { + MakeUnversionedInt64Value(-7338, 0), + MakeUnversionedStringValue("foo", 1), + MakeUnversionedDoubleValue(-0.16, 2), + }, + }); + + auto statistics1 = TColumnarStatistics::MakeEmpty(3); + statistics1.Update(rows1); + auto expectedHll1 = ComputeHll(rows1); + + TColumnarStatistics expected1{ + .ColumnDataWeights = {16, 7, 16}, + .ColumnMinValues = { + MakeUnversionedInt64Value(-7338), + MakeUnversionedStringValue("buzz"), + MakeUnversionedDoubleValue(-0.16), + }, + .ColumnMaxValues = { + MakeUnversionedInt64Value(12), + MakeUnversionedStringValue("foo"), + MakeUnversionedDoubleValue(1e70), + }, + .ColumnNonNullValueCounts = {2, 2, 2}, + .ChunkRowCount = 2, + .LegacyChunkRowCount = 0, + .LargeStatistics = TLargeColumnarStatistics{.ColumnHyperLogLogDigests = expectedHll1} + }; + + EXPECT_EQ(statistics1, expected1); + + auto rows2 = CaptureRows(rowBuffer, { + { + MakeUnversionedNullValue(0), + MakeUnversionedStringValue("chyt", 1), + MakeUnversionedDoubleValue(-15.0, 2), + }, + }); + + auto statistics2 = TColumnarStatistics::MakeEmpty(3, true /* keep value statistics */, false /* keep hll */); + EXPECT_TRUE(statistics2.HasValueStatistics()); + EXPECT_FALSE(statistics2.HasLargeStatistics()); + statistics2.Update(rows2); + + TColumnarStatistics expected2{ + .ColumnDataWeights = {0, 4, 8}, + .ColumnMinValues = { + MakeUnversionedNullValue(), + MakeUnversionedStringValue("chyt"), + MakeUnversionedDoubleValue(-15.0), + }, + .ColumnMaxValues = { + MakeUnversionedNullValue(), + MakeUnversionedStringValue("chyt"), + MakeUnversionedDoubleValue(-15.0), + }, + .ColumnNonNullValueCounts = {0, 1, 1}, + .ChunkRowCount = 1, + .LegacyChunkRowCount = 0, + .LargeStatistics = TLargeColumnarStatistics{.ColumnHyperLogLogDigests = {}} + }; + + EXPECT_EQ(statistics2, expected2); + + auto statistics3 = statistics1; + statistics3 += statistics2; + + // statistics2 does not have hyperloglog, hence it does not change going statistics1 + // to statistics3. + TColumnarStatistics expected3{ + .ColumnDataWeights = {16, 11, 24}, + .ColumnMinValues = { + MakeUnversionedInt64Value(-7338), + MakeUnversionedStringValue("buzz"), + MakeUnversionedDoubleValue(-15.0), + }, + .ColumnMaxValues = { + MakeUnversionedInt64Value(12), + MakeUnversionedStringValue("foo"), + MakeUnversionedDoubleValue(1e70), + }, + .ColumnNonNullValueCounts = {2, 3, 3}, + .ChunkRowCount = 3, + .LegacyChunkRowCount = 0, + .LargeStatistics = TLargeColumnarStatistics{.ColumnHyperLogLogDigests = {}} + }; + + EXPECT_EQ(statistics3, expected3); + + auto rows4 = CaptureRows(rowBuffer, { + { + MakeUnversionedInt64Value(200), + MakeUnversionedStringValue("booboo", 1), + MakeUnversionedDoubleValue(-std::numeric_limits<double>::infinity(), 2), + }, + }); + statistics2.Update(rows4); + + auto statistics4 = statistics1; + statistics4.Update(rows4); + std::vector<TUnversionedRow> rowsCombined(rows1); + std::copy(rows4.begin(), rows4.end(), std::back_inserter(rowsCombined)); + auto expectedHll4 = ComputeHll(rowsCombined); + + // statistics1 includes column hyperloglog, and updating it with more rows changes + // hyperloglog values. + TColumnarStatistics expected4{ + .ColumnDataWeights = {24, 13, 24}, + .ColumnMinValues = { + MakeUnversionedInt64Value(-7338), + MakeUnversionedStringValue("booboo"), + MakeUnversionedDoubleValue(-std::numeric_limits<double>::infinity()), + }, + .ColumnMaxValues = { + MakeUnversionedInt64Value(200), + MakeUnversionedStringValue("foo"), + MakeUnversionedDoubleValue(1e70), + }, + .ColumnNonNullValueCounts = {3, 3, 3}, + .ChunkRowCount = 3, + .LegacyChunkRowCount = 0, + .LargeStatistics = TLargeColumnarStatistics{.ColumnHyperLogLogDigests = expectedHll4} + }; + + EXPECT_EQ(statistics4, expected4); + + TColumnarStatistics empty; + empty += statistics2; +} + TEST(TUpdateColumnarStatisticsTest, NoValueStatistics) { auto rowBuffer = New<TRowBuffer>(); @@ -303,6 +619,7 @@ TEST(TUpdateColumnarStatisticsTest, DifferentTypesInOneColumn) .ColumnNonNullValueCounts = {3}, .ChunkRowCount = 3, .LegacyChunkRowCount = 0, + .LargeStatistics = TLargeColumnarStatistics{.ColumnHyperLogLogDigests = ComputeHll(rows)} }; EXPECT_EQ(statistics, expected); } @@ -343,6 +660,7 @@ TEST(TUpdateColumnarStatisticsTest, VersionedRow) .ColumnNonNullValueCounts = {2, 3, 2}, .ChunkRowCount = 2, .LegacyChunkRowCount = 0, + .LargeStatistics = TLargeColumnarStatistics{.ColumnHyperLogLogDigests = ComputeHll(rows)} }; EXPECT_EQ(statistics, expected); } @@ -369,6 +687,10 @@ TEST(TMergeColumnarStatisticsTest, EmptyAndNonEmpty) .ColumnNonNullValueCounts = {5, 10, 50, 3}, .ChunkRowCount = 50, .LegacyChunkRowCount = 5, + .LargeStatistics = TLargeColumnarStatistics{ + .ColumnHyperLogLogDigests = + std::vector<TColumnarHyperLogLogDigest>(4, TColumnarHyperLogLogDigest()) + } }; lhs += rhs; EXPECT_EQ(lhs, rhs); @@ -393,6 +715,10 @@ TEST(TMergeColumnarStatisticsTest, NonEmptyAndEmpty) .ColumnNonNullValueCounts = {5, 10, 50, 3}, .ChunkRowCount = 50, .LegacyChunkRowCount = 5, + .LargeStatistics = TLargeColumnarStatistics{ + .ColumnHyperLogLogDigests = + std::vector<TColumnarHyperLogLogDigest>(4, TColumnarHyperLogLogDigest()) + } }; auto rhs = TColumnarStatistics::MakeEmpty(4); auto oldLhs = lhs; @@ -425,6 +751,10 @@ TEST(TMergeColumnarStatisticsTest, DifferentTypes) .ColumnNonNullValueCounts = {8, 5, 10, 8, 3, 9, 0}, .ChunkRowCount = 10, .LegacyChunkRowCount = 2, + .LargeStatistics = TLargeColumnarStatistics{ + .ColumnHyperLogLogDigests = + std::vector<TColumnarHyperLogLogDigest>(7, TColumnarHyperLogLogDigest()) + } }; TColumnarStatistics rhs{ @@ -450,6 +780,10 @@ TEST(TMergeColumnarStatisticsTest, DifferentTypes) .ColumnNonNullValueCounts = {12, 7, 10, 10, 2, 8, 0}, .ChunkRowCount = 12, .LegacyChunkRowCount = 3, + .LargeStatistics = TLargeColumnarStatistics{ + .ColumnHyperLogLogDigests = + std::vector<TColumnarHyperLogLogDigest>(7, TColumnarHyperLogLogDigest()) + } }; lhs += rhs; @@ -477,6 +811,10 @@ TEST(TMergeColumnarStatisticsTest, DifferentTypes) .ColumnNonNullValueCounts = {20, 12, 20, 18, 5, 17, 0}, .ChunkRowCount = 22, .LegacyChunkRowCount = 5, + .LargeStatistics = TLargeColumnarStatistics{ + .ColumnHyperLogLogDigests = + std::vector<TColumnarHyperLogLogDigest>(7, TColumnarHyperLogLogDigest()) + } }; EXPECT_EQ(lhs, expected); } @@ -489,6 +827,7 @@ TEST(TMergeColumnarStatisticsTest, DifferentTypesInOneColumn) .ColumnMaxValues = {MakeUnversionedBooleanValue(false)}, .ColumnNonNullValueCounts = {20}, .ChunkRowCount = 20, + .LargeStatistics = TLargeColumnarStatistics{.ColumnHyperLogLogDigests = {TColumnarHyperLogLogDigest()},} }; TColumnarStatistics rhs{ @@ -497,6 +836,7 @@ TEST(TMergeColumnarStatisticsTest, DifferentTypesInOneColumn) .ColumnMaxValues = {MakeUnversionedStringValue("pick")}, .ColumnNonNullValueCounts = {13}, .ChunkRowCount = 13, + .LargeStatistics = TLargeColumnarStatistics{.ColumnHyperLogLogDigests = {TColumnarHyperLogLogDigest()},} }; lhs += rhs; @@ -508,6 +848,7 @@ TEST(TMergeColumnarStatisticsTest, DifferentTypesInOneColumn) .ColumnNonNullValueCounts = {33}, .ChunkRowCount = 33, .LegacyChunkRowCount = 0, + .LargeStatistics = TLargeColumnarStatistics{.ColumnHyperLogLogDigests = {TColumnarHyperLogLogDigest()},} }; EXPECT_EQ(lhs, expected); } @@ -590,6 +931,10 @@ TEST(TColumnarStatisticsColumnSelectionTest, ColumnSelect) .ColumnNonNullValueCounts = {8, 5, 10, 8, 3, 9, 0}, .ChunkRowCount = 10, .LegacyChunkRowCount = 2, + .LargeStatistics = TLargeColumnarStatistics{ + .ColumnHyperLogLogDigests = + std::vector<TColumnarHyperLogLogDigest>(7, TColumnarHyperLogLogDigest()) + } }; auto nameTable = TNameTable::FromKeyColumns({"buzz", "off", "taken", "sec", "list", "size", "friend"}); std::vector<TColumnStableName> stableNames = { @@ -624,8 +969,44 @@ TEST(TColumnarStatisticsColumnSelectionTest, ColumnSelect) .ColumnNonNullValueCounts = {0, 10, 8, 0, 3, 0}, .ChunkRowCount = 10, .LegacyChunkRowCount = 2, + .LargeStatistics = TLargeColumnarStatistics{ + .ColumnHyperLogLogDigests = + std::vector<TColumnarHyperLogLogDigest>(6, TColumnarHyperLogLogDigest()) + } }; EXPECT_EQ(selectedStatistics, expected); + + auto statisticsNoHll = statistics; + statisticsNoHll.LargeStatistics.ColumnHyperLogLogDigests.clear(); + EXPECT_TRUE(statisticsNoHll.HasValueStatistics()); + EXPECT_FALSE(statisticsNoHll.HasLargeStatistics()); + + auto selectedStatisticsNoHll = statisticsNoHll.SelectByColumnNames(nameTable, stableNames); + + TColumnarStatistics expectedNoHll{ + .ColumnDataWeights = {0, 10, 64, 0, 100, 0}, + .ColumnMinValues = { + MakeUnversionedNullValue(), + MakeUnversionedBooleanValue(true), + MakeUnversionedInt64Value(-10), + MakeUnversionedNullValue(), + MakeUnversionedSentinelValue(EValueType::Min), + MakeUnversionedNullValue(), + }, + .ColumnMaxValues = { + MakeUnversionedNullValue(), + MakeUnversionedBooleanValue(true), + MakeUnversionedInt64Value(20), + MakeUnversionedNullValue(), + MakeUnversionedSentinelValue(EValueType::Max), + MakeUnversionedNullValue(), + }, + .ColumnNonNullValueCounts = {0, 10, 8, 0, 3, 0}, + .ChunkRowCount = 10, + .LegacyChunkRowCount = 2, + .LargeStatistics = TLargeColumnarStatistics{.ColumnHyperLogLogDigests = {}} + }; + EXPECT_EQ(selectedStatisticsNoHll, expectedNoHll); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/table_client/unittests/helpers/helpers.cpp b/yt/yt/client/table_client/unittests/helpers/helpers.cpp index d59dcd9a586..0f4955f6880 100644 --- a/yt/yt/client/table_client/unittests/helpers/helpers.cpp +++ b/yt/yt/client/table_client/unittests/helpers/helpers.cpp @@ -274,7 +274,17 @@ void PrintTo(const TColumnarStatistics& statistics, std::ostream* os) << "ChunkRowCount: " << ::testing::PrintToString(statistics.ChunkRowCount) << "\n" << "LegacyChunkRowCount: " - << ::testing::PrintToString(statistics.LegacyChunkRowCount); + << ::testing::PrintToString(statistics.LegacyChunkRowCount) << "\n"; + + if (!statistics.LargeStatistics.Empty()) { + *os << "ColumnHyperLogLogDigests: [\n"; + for (const auto& hyperLogLog : statistics.LargeStatistics.ColumnHyperLogLogDigests) { + *os << " "; + *os << ToString(hyperLogLog); + *os << "\n"; + } + *os << "]\n"; + } } NTableChunkFormat::NProto::TSegmentMeta CreateSimpleSegmentMeta() diff --git a/yt/yt/core/misc/hyperloglog.h b/yt/yt/core/misc/hyperloglog.h index 88f9228ddd7..d5f7a706d95 100644 --- a/yt/yt/core/misc/hyperloglog.h +++ b/yt/yt/core/misc/hyperloglog.h @@ -4,6 +4,9 @@ #include "hyperloglog_bias.h" #include "farm_hash.h" +#include <library/cpp/yt/memory/range.h> +#include <yt/yt_proto/yt/core/misc/proto/hyperloglog.pb.h> + #include <cmath> namespace NYT { @@ -11,6 +14,24 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// template <int Precision> +class THyperLogLog; + +template <int Precision> +void FormatValue(TStringBuilderBase* builder, const THyperLogLog<Precision>& value, TStringBuf format); + +template <int Precision> +void ToProto( + NProto::THyperLogLog* protoHyperLogLog, + const THyperLogLog<Precision>& hyperloglog); + +template <int Precision> +void FromProto( + THyperLogLog<Precision>* hyperloglog, + const NProto::THyperLogLog& protoHyperLogLog); + +//////////////////////////////////////////////////////////////////////////////// + +template <int Precision> class THyperLogLog { static_assert( @@ -18,7 +39,10 @@ class THyperLogLog "Precision is out of range (expected to be within 4..18)"); public: + static constexpr ui64 RegisterCount = (ui64)1 << Precision; + THyperLogLog(); + explicit THyperLogLog(TRange<char> data); void Add(TFingerprint fingerprint); @@ -26,10 +50,23 @@ public: ui64 EstimateCardinality() const; + TRange<char> Data() const; + static ui64 EstimateCardinality(const std::vector<ui64>& values); + bool operator==(const THyperLogLog<Precision>& other) const = default; + private: - static constexpr ui64 RegisterCount = (ui64)1 << Precision; + friend void ToProto<Precision>( + NProto::THyperLogLog* protoHyperLogLog, + const THyperLogLog<Precision>& hyperloglog); + + friend void FromProto<Precision>( + THyperLogLog<Precision>* hyperloglog, + const NProto::THyperLogLog& protoHyperLogLog); + + friend void FormatValue<Precision>(TStringBuilderBase* builder, const THyperLogLog<Precision>& value, TStringBuf format); + static constexpr ui64 PrecisionMask = RegisterCount - 1; static constexpr double Threshold = NDetail::Thresholds[Precision - 4]; static constexpr int Size = NDetail::Sizes[Precision - 4]; @@ -48,6 +85,19 @@ THyperLogLog<Precision>::THyperLogLog() } template <int Precision> +THyperLogLog<Precision>::THyperLogLog(TRange<char> data) +{ + YT_VERIFY(data.size() == RegisterCount); + std::copy(data.begin(), data.end(), ZeroCounts_.begin()); +} + +template <int Precision> +TRange<char> THyperLogLog<Precision>::Data() const +{ + return TRange<char>(reinterpret_cast<const char*>(ZeroCounts_.begin()), ZeroCounts_.size()); +} + +template <int Precision> void THyperLogLog<Precision>::Add(TFingerprint fingerprint) { fingerprint |= ((ui64)1 << 63); @@ -134,6 +184,32 @@ ui64 THyperLogLog<Precision>::EstimateCardinality(const std::vector<ui64>& value return state.EstimateCardinality(); } +template <int Precision> +void ToProto( + NProto::THyperLogLog* protoHyperLogLog, + const THyperLogLog<Precision>& hyperloglog) +{ + protoHyperLogLog->set_data(hyperloglog.Data().begin(), hyperloglog.Data().size()); +} + +template <int Precision> +void FromProto( + THyperLogLog<Precision>* hyperloglog, + const NProto::THyperLogLog& protoHyperLogLog) +{ + YT_VERIFY(protoHyperLogLog.data().size() == std::size(hyperloglog->ZeroCounts_)); + std::copy(protoHyperLogLog.data().begin(), protoHyperLogLog.data().end(), hyperloglog->ZeroCounts_.begin()); +} + +//////////////////////////////////////////////////////////////////////////////// + +template <int Precision> +void FormatValue(TStringBuilderBase* builder, const THyperLogLog<Precision>& value, TStringBuf /*format*/) +{ + builder->AppendFormat("%v", std::span<const char>( + reinterpret_cast<const char*>(value.ZeroCounts_.begin()), value.ZeroCounts_.size())); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT diff --git a/yt/yt/core/misc/unittests/hyperloglog_ut.cpp b/yt/yt/core/misc/unittests/hyperloglog_ut.cpp index 188b4ec24fa..4f23817757f 100644 --- a/yt/yt/core/misc/unittests/hyperloglog_ut.cpp +++ b/yt/yt/core/misc/unittests/hyperloglog_ut.cpp @@ -53,6 +53,10 @@ TEST_P(THyperLogLogTest, Random) rng, size, targetCardinality); + + THyperLogLog<8> hllClone(hll.first.Data()); + EXPECT_EQ(hll.first.EstimateCardinality(), hllClone.EstimateCardinality()); + auto err = ((double)hll.first.EstimateCardinality() - hll.second) / hll.second; error += err; } diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 67c8ba88bde..79e4c38f4f7 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -6,8 +6,9 @@ option java_multiple_files = true; option go_package = "a.yandex-team.ru/yt/go/proto/client/api/rpc_proxy"; -import "yt_proto/yt/core/misc/proto/guid.proto"; import "yt_proto/yt/core/misc/proto/error.proto"; +import "yt_proto/yt/core/misc/proto/guid.proto"; +import "yt_proto/yt/core/misc/proto/hyperloglog.proto"; import "yt_proto/yt/core/ytree/proto/attributes.proto"; import "yt_proto/yt/core/ytree/proto/request_complexity_limits.proto"; import "yt_proto/yt/client/chunk_client/proto/data_statistics.proto"; @@ -2062,6 +2063,9 @@ message TColumnarStatistics optional int64 chunk_row_count = 12; // Total number of rows in legacy chunks whose meta misses columnar statistics. optional int64 legacy_chunk_row_count = 13; + + // Per-column hyperloglog digest to approximate number of unique values + repeated NYT.NProto.THyperLogLog column_hyperloglog_digests = 14; } message TRspGetColumnarStatistics diff --git a/yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.proto b/yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.proto index 6518287f9d3..7ca77a0622f 100644 --- a/yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.proto +++ b/yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.proto @@ -245,6 +245,12 @@ message TColumnGroupInfosExt repeated int32 column_to_group = 3; } +message TLargeColumnarStatisticsExt +{ + // Digests for all columns, potentially compressed, in one byte string. + optional bytes column_hyperloglog_digests = 1; +} + message TColumnarStatisticsExt { repeated int64 column_data_weights = 1; @@ -263,6 +269,7 @@ message TColumnarStatisticsExt // Total number of rows in a chunk. optional int64 chunk_row_count = 7; + // This field is reserved for consistency with TColumnarStatistics message. It has never been used. reserved 8; } diff --git a/yt/yt_proto/yt/core/misc/proto/hyperloglog.proto b/yt/yt_proto/yt/core/misc/proto/hyperloglog.proto new file mode 100644 index 00000000000..c5aaa77753a --- /dev/null +++ b/yt/yt_proto/yt/core/misc/proto/hyperloglog.proto @@ -0,0 +1,15 @@ +package NYT.NProto; + +option java_package = "tech.ytsaurus"; +option java_multiple_files = true; + +option go_package = "a.yandex-team.ru/yt/go/proto/core/misc"; + +//////////////////////////////////////////////////////////////////////////////// + +message THyperLogLog +{ + required bytes data = 1; +} + +//////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt_proto/yt/core/ya.make b/yt/yt_proto/yt/core/ya.make index cc684cf8c9e..306c87f7c88 100644 --- a/yt/yt_proto/yt/core/ya.make +++ b/yt/yt_proto/yt/core/ya.make @@ -8,6 +8,7 @@ SRCS( misc/proto/bloom_filter.proto misc/proto/error.proto misc/proto/guid.proto + misc/proto/hyperloglog.proto misc/proto/protobuf_helpers.proto tracing/proto/span.proto |