diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-03-11 11:46:39 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-03-11 11:46:39 +0300 |
commit | 836badeb1d0da3c2cbe75cee3db353fb18b6e688 (patch) | |
tree | 1a34ab8485b93492d63892e26b5c387d889c5fe4 | |
parent | b4b93eacdcfde94f6619d7633278097f475c50f2 (diff) | |
download | ydb-836badeb1d0da3c2cbe75cee3db353fb18b6e688.tar.gz |
correct sharding
-rw-r--r-- | ydb/core/tx/sharding/sharding.cpp | 50 | ||||
-rw-r--r-- | ydb/core/tx/sharding/sharding.h | 49 |
2 files changed, 73 insertions, 26 deletions
diff --git a/ydb/core/tx/sharding/sharding.cpp b/ydb/core/tx/sharding/sharding.cpp index a3f006db34c..4e2432a0d04 100644 --- a/ydb/core/tx/sharding/sharding.cpp +++ b/ydb/core/tx/sharding/sharding.cpp @@ -96,8 +96,17 @@ TString TShardingBase::DebugString() const { return "Columns: " + JoinSeq(", ", GetShardingColumns()); } +std::vector<ui32> TShardingBase::MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const { + std::vector<ui64> hashes = MakeHashes(batch); + std::vector<ui32> result; + result.reserve(hashes.size()); + for (auto&& i : hashes) { + result.emplace_back(i % ShardsCount); + } + return result; +} -std::vector<ui32> THashSharding::MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const { +std::vector<ui64> THashSharding::MakeHashes(const std::shared_ptr<arrow::RecordBatch>& batch) const { std::vector<std::shared_ptr<arrow::Array>> columns; columns.reserve(ShardingColumns.size()); @@ -109,7 +118,7 @@ std::vector<ui32> THashSharding::MakeSharding(const std::shared_ptr<arrow::Recor columns.emplace_back(array); } - std::vector<ui32> out(batch->num_rows()); + std::vector<ui64> out(batch->num_rows()); TStreamStringHashCalcer hashCalcer(Seed); @@ -118,17 +127,17 @@ std::vector<ui32> THashSharding::MakeSharding(const std::shared_ptr<arrow::Recor for (auto& column : columns) { AppendField(column, row, hashCalcer); } - out[row] = hashCalcer.Finish() % ShardsCount; + out[row] = hashCalcer.Finish(); } return out; } -ui32 THashSharding::CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const { +ui64 THashSharding::CalcHash(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const { TStreamStringHashCalcer hashCalcer(Seed); hashCalcer.Start(); readerInfo.BuildStringForHash(value, hashCalcer); - return hashCalcer.Finish() % ShardsCount; + return hashCalcer.Finish(); } std::vector<ui32> TLogsSharding::MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const { @@ -141,6 +150,28 @@ std::vector<ui32> TLogsSharding::MakeSharding(const std::shared_ptr<arrow::Recor return {}; } + const std::vector<ui64> hashes = MakeHashes(batch); + if (hashes.empty()) { + return {}; + } + + auto tsColumn = std::static_pointer_cast<arrow::TimestampArray>(tsArray); + std::vector<ui32> out; + out.reserve(batch->num_rows()); + + TStreamStringHashCalcer hashCalcer(0); + for (int row = 0; row < batch->num_rows(); ++row) { + out.emplace_back(ShardNo(tsColumn->Value(row), hashes[row])); + } + + return out; +} + +std::vector<ui64> TLogsSharding::MakeHashes(const std::shared_ptr<arrow::RecordBatch>& batch) const { + if (ShardingColumns.size() < 2) { + return {}; + } + std::vector<std::shared_ptr<arrow::Array>> extraColumns; extraColumns.reserve(ShardingColumns.size() - 1); @@ -152,8 +183,7 @@ std::vector<ui32> TLogsSharding::MakeSharding(const std::shared_ptr<arrow::Recor extraColumns.emplace_back(array); } - auto tsColumn = std::static_pointer_cast<arrow::TimestampArray>(tsArray); - std::vector<ui32> out; + std::vector<ui64> out; out.reserve(batch->num_rows()); TStreamStringHashCalcer hashCalcer(0); @@ -162,15 +192,13 @@ std::vector<ui32> TLogsSharding::MakeSharding(const std::shared_ptr<arrow::Recor for (auto& column : extraColumns) { AppendField(column, row, hashCalcer); } - - const ui32 shardNo = ShardNo(tsColumn->Value(row), hashCalcer.Finish()); - out.emplace_back(shardNo); + out.emplace_back(hashCalcer.Finish()); } return out; } -ui32 TLogsSharding::CalcShardId(const NKikimr::NUdf::TUnboxedValue& /*value*/, const TUnboxedValueReader& /*readerInfo*/) const { +ui64 TLogsSharding::CalcHash(const NKikimr::NUdf::TUnboxedValue& /*value*/, const TUnboxedValueReader& /*readerInfo*/) const { YQL_ENSURE(false); return 0; } diff --git a/ydb/core/tx/sharding/sharding.h b/ydb/core/tx/sharding/sharding.h index 3d0e0350cca..73243cf2cc6 100644 --- a/ydb/core/tx/sharding/sharding.h +++ b/ydb/core/tx/sharding/sharding.h @@ -55,6 +55,8 @@ public: }; class TShardingBase { +private: + YDB_READONLY(ui32, ShardsCount, 0); public: using TColumn = TExternalTableColumn; @@ -64,45 +66,60 @@ public: static std::unique_ptr<TShardingBase> BuildShardingOperator(const NKikimrSchemeOp::TColumnTableSharding& shardingInfo); virtual const std::vector<TString>& GetShardingColumns() const = 0; - virtual ui32 CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const = 0; - virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0; + ui32 CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const { + return CalcHash(value, readerInfo) % ShardsCount; + } + virtual ui64 CalcHash(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const = 0; + + virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const; + virtual std::vector<ui64> MakeHashes(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0; virtual TString DebugString() const; + TShardingBase(const ui32 shardsCount) + : ShardsCount(shardsCount) + { + + } virtual ~TShardingBase() = default; }; class THashSharding : public TShardingBase { private: - ui32 ShardsCount; + using TBase = TShardingBase; ui64 Seed; std::vector<TString> ShardingColumns; public: THashSharding(ui32 shardsCount, const std::vector<TString>& columnNames, ui64 seed = 0) - : ShardsCount(shardsCount) + : TBase(shardsCount) , Seed(seed) , ShardingColumns(columnNames) {} - virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const override; + virtual std::vector<ui64> MakeHashes(const std::shared_ptr<arrow::RecordBatch>& batch) const override; + + template <typename T> + static ui64 CalcHash(const T value, const ui32 seed = 0) { + static_assert(std::is_arithmetic<T>::value); + return XXH64(&value, sizeof(value), seed); + } template <typename T> static ui32 ShardNo(const T value, const ui32 shardsCount, const ui32 seed = 0) { Y_ASSERT(shardsCount); - static_assert(std::is_arithmetic<T>::value); - return XXH64(&value, sizeof(value), seed) % shardsCount; + return CalcHash(value, seed) % shardsCount; } virtual const std::vector<TString>& GetShardingColumns() const override { return ShardingColumns; } - virtual ui32 CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override; + virtual ui64 CalcHash(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override; }; // KIKIMR-11529 class TLogsSharding : public TShardingBase { private: - ui32 ShardsCount; + using TBase = TShardingBase; ui32 NumActive; ui64 TsMin; ui64 ChangePeriod; @@ -112,8 +129,8 @@ public: static constexpr TDuration DEFAULT_CHANGE_PERIOD = TDuration::Minutes(5); TLogsSharding(ui32 shardsCountTotal, const std::vector<TString>& columnNames, ui32 shardsCountActive, TDuration changePeriod = DEFAULT_CHANGE_PERIOD) - : ShardsCount(shardsCountTotal) - , NumActive(Min<ui32>(shardsCountActive, ShardsCount)) + : TBase(shardsCountTotal) + , NumActive(Min<ui32>(shardsCountActive, GetShardsCount())) , TsMin(0) , ChangePeriod(changePeriod.MicroSeconds()) , ShardingColumns(columnNames) @@ -127,16 +144,18 @@ public: // tabletId = tabletIds[shardNo]; ui32 ShardNo(ui64 timestamp, const ui64 uidHash) const { ui32 tsInterval = (timestamp - TsMin) / ChangePeriod; - ui32 numIntervals = ShardsCount / NumActive; - return ((uidHash % NumActive) + (tsInterval % numIntervals) * NumActive) % ShardsCount; + ui32 numIntervals = GetShardsCount() / NumActive; + return ((uidHash % NumActive) + (tsInterval % numIntervals) * NumActive) % GetShardsCount(); } + virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const override; + virtual std::vector<ui64> MakeHashes(const std::shared_ptr<arrow::RecordBatch>& batch) const override; + virtual const std::vector<TString>& GetShardingColumns() const override { return ShardingColumns; } - virtual ui32 CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override; - virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const override; + virtual ui64 CalcHash(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override; }; |