aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-03-11 11:46:39 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-03-11 11:46:39 +0300
commit836badeb1d0da3c2cbe75cee3db353fb18b6e688 (patch)
tree1a34ab8485b93492d63892e26b5c387d889c5fe4
parentb4b93eacdcfde94f6619d7633278097f475c50f2 (diff)
downloadydb-836badeb1d0da3c2cbe75cee3db353fb18b6e688.tar.gz
correct sharding
-rw-r--r--ydb/core/tx/sharding/sharding.cpp50
-rw-r--r--ydb/core/tx/sharding/sharding.h49
2 files changed, 73 insertions, 26 deletions
diff --git a/ydb/core/tx/sharding/sharding.cpp b/ydb/core/tx/sharding/sharding.cpp
index a3f006db34c..4e2432a0d04 100644
--- a/ydb/core/tx/sharding/sharding.cpp
+++ b/ydb/core/tx/sharding/sharding.cpp
@@ -96,8 +96,17 @@ TString TShardingBase::DebugString() const {
return "Columns: " + JoinSeq(", ", GetShardingColumns());
}
+std::vector<ui32> TShardingBase::MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ std::vector<ui64> hashes = MakeHashes(batch);
+ std::vector<ui32> result;
+ result.reserve(hashes.size());
+ for (auto&& i : hashes) {
+ result.emplace_back(i % ShardsCount);
+ }
+ return result;
+}
-std::vector<ui32> THashSharding::MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+std::vector<ui64> THashSharding::MakeHashes(const std::shared_ptr<arrow::RecordBatch>& batch) const {
std::vector<std::shared_ptr<arrow::Array>> columns;
columns.reserve(ShardingColumns.size());
@@ -109,7 +118,7 @@ std::vector<ui32> THashSharding::MakeSharding(const std::shared_ptr<arrow::Recor
columns.emplace_back(array);
}
- std::vector<ui32> out(batch->num_rows());
+ std::vector<ui64> out(batch->num_rows());
TStreamStringHashCalcer hashCalcer(Seed);
@@ -118,17 +127,17 @@ std::vector<ui32> THashSharding::MakeSharding(const std::shared_ptr<arrow::Recor
for (auto& column : columns) {
AppendField(column, row, hashCalcer);
}
- out[row] = hashCalcer.Finish() % ShardsCount;
+ out[row] = hashCalcer.Finish();
}
return out;
}
-ui32 THashSharding::CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const {
+ui64 THashSharding::CalcHash(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const {
TStreamStringHashCalcer hashCalcer(Seed);
hashCalcer.Start();
readerInfo.BuildStringForHash(value, hashCalcer);
- return hashCalcer.Finish() % ShardsCount;
+ return hashCalcer.Finish();
}
std::vector<ui32> TLogsSharding::MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const {
@@ -141,6 +150,28 @@ std::vector<ui32> TLogsSharding::MakeSharding(const std::shared_ptr<arrow::Recor
return {};
}
+ const std::vector<ui64> hashes = MakeHashes(batch);
+ if (hashes.empty()) {
+ return {};
+ }
+
+ auto tsColumn = std::static_pointer_cast<arrow::TimestampArray>(tsArray);
+ std::vector<ui32> out;
+ out.reserve(batch->num_rows());
+
+ TStreamStringHashCalcer hashCalcer(0);
+ for (int row = 0; row < batch->num_rows(); ++row) {
+ out.emplace_back(ShardNo(tsColumn->Value(row), hashes[row]));
+ }
+
+ return out;
+}
+
+std::vector<ui64> TLogsSharding::MakeHashes(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ if (ShardingColumns.size() < 2) {
+ return {};
+ }
+
std::vector<std::shared_ptr<arrow::Array>> extraColumns;
extraColumns.reserve(ShardingColumns.size() - 1);
@@ -152,8 +183,7 @@ std::vector<ui32> TLogsSharding::MakeSharding(const std::shared_ptr<arrow::Recor
extraColumns.emplace_back(array);
}
- auto tsColumn = std::static_pointer_cast<arrow::TimestampArray>(tsArray);
- std::vector<ui32> out;
+ std::vector<ui64> out;
out.reserve(batch->num_rows());
TStreamStringHashCalcer hashCalcer(0);
@@ -162,15 +192,13 @@ std::vector<ui32> TLogsSharding::MakeSharding(const std::shared_ptr<arrow::Recor
for (auto& column : extraColumns) {
AppendField(column, row, hashCalcer);
}
-
- const ui32 shardNo = ShardNo(tsColumn->Value(row), hashCalcer.Finish());
- out.emplace_back(shardNo);
+ out.emplace_back(hashCalcer.Finish());
}
return out;
}
-ui32 TLogsSharding::CalcShardId(const NKikimr::NUdf::TUnboxedValue& /*value*/, const TUnboxedValueReader& /*readerInfo*/) const {
+ui64 TLogsSharding::CalcHash(const NKikimr::NUdf::TUnboxedValue& /*value*/, const TUnboxedValueReader& /*readerInfo*/) const {
YQL_ENSURE(false);
return 0;
}
diff --git a/ydb/core/tx/sharding/sharding.h b/ydb/core/tx/sharding/sharding.h
index 3d0e0350cca..73243cf2cc6 100644
--- a/ydb/core/tx/sharding/sharding.h
+++ b/ydb/core/tx/sharding/sharding.h
@@ -55,6 +55,8 @@ public:
};
class TShardingBase {
+private:
+ YDB_READONLY(ui32, ShardsCount, 0);
public:
using TColumn = TExternalTableColumn;
@@ -64,45 +66,60 @@ public:
static std::unique_ptr<TShardingBase> BuildShardingOperator(const NKikimrSchemeOp::TColumnTableSharding& shardingInfo);
virtual const std::vector<TString>& GetShardingColumns() const = 0;
- virtual ui32 CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const = 0;
- virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0;
+ ui32 CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const {
+ return CalcHash(value, readerInfo) % ShardsCount;
+ }
+ virtual ui64 CalcHash(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const = 0;
+
+ virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const;
+ virtual std::vector<ui64> MakeHashes(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0;
virtual TString DebugString() const;
+ TShardingBase(const ui32 shardsCount)
+ : ShardsCount(shardsCount)
+ {
+
+ }
virtual ~TShardingBase() = default;
};
class THashSharding : public TShardingBase {
private:
- ui32 ShardsCount;
+ using TBase = TShardingBase;
ui64 Seed;
std::vector<TString> ShardingColumns;
public:
THashSharding(ui32 shardsCount, const std::vector<TString>& columnNames, ui64 seed = 0)
- : ShardsCount(shardsCount)
+ : TBase(shardsCount)
, Seed(seed)
, ShardingColumns(columnNames)
{}
- virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
+ virtual std::vector<ui64> MakeHashes(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
+
+ template <typename T>
+ static ui64 CalcHash(const T value, const ui32 seed = 0) {
+ static_assert(std::is_arithmetic<T>::value);
+ return XXH64(&value, sizeof(value), seed);
+ }
template <typename T>
static ui32 ShardNo(const T value, const ui32 shardsCount, const ui32 seed = 0) {
Y_ASSERT(shardsCount);
- static_assert(std::is_arithmetic<T>::value);
- return XXH64(&value, sizeof(value), seed) % shardsCount;
+ return CalcHash(value, seed) % shardsCount;
}
virtual const std::vector<TString>& GetShardingColumns() const override {
return ShardingColumns;
}
- virtual ui32 CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override;
+ virtual ui64 CalcHash(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override;
};
// KIKIMR-11529
class TLogsSharding : public TShardingBase {
private:
- ui32 ShardsCount;
+ using TBase = TShardingBase;
ui32 NumActive;
ui64 TsMin;
ui64 ChangePeriod;
@@ -112,8 +129,8 @@ public:
static constexpr TDuration DEFAULT_CHANGE_PERIOD = TDuration::Minutes(5);
TLogsSharding(ui32 shardsCountTotal, const std::vector<TString>& columnNames, ui32 shardsCountActive, TDuration changePeriod = DEFAULT_CHANGE_PERIOD)
- : ShardsCount(shardsCountTotal)
- , NumActive(Min<ui32>(shardsCountActive, ShardsCount))
+ : TBase(shardsCountTotal)
+ , NumActive(Min<ui32>(shardsCountActive, GetShardsCount()))
, TsMin(0)
, ChangePeriod(changePeriod.MicroSeconds())
, ShardingColumns(columnNames)
@@ -127,16 +144,18 @@ public:
// tabletId = tabletIds[shardNo];
ui32 ShardNo(ui64 timestamp, const ui64 uidHash) const {
ui32 tsInterval = (timestamp - TsMin) / ChangePeriod;
- ui32 numIntervals = ShardsCount / NumActive;
- return ((uidHash % NumActive) + (tsInterval % numIntervals) * NumActive) % ShardsCount;
+ ui32 numIntervals = GetShardsCount() / NumActive;
+ return ((uidHash % NumActive) + (tsInterval % numIntervals) * NumActive) % GetShardsCount();
}
+ virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
+ virtual std::vector<ui64> MakeHashes(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
+
virtual const std::vector<TString>& GetShardingColumns() const override {
return ShardingColumns;
}
- virtual ui32 CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override;
- virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
+ virtual ui64 CalcHash(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override;
};