aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-03-02 12:20:39 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-03-02 12:20:39 +0300
commitc79a769325b2b08e32db32199488f0a03457481d (patch)
treec3396b76d94d395419357a3464185d9e88d9ae15
parent0ae7543b3e35182cf8617606040b606cd595232a (diff)
downloadydb-c79a769325b2b08e32db32199488f0a03457481d.tar.gz
distribute data by shards on upsert for cs
-rw-r--r--ydb/core/formats/sharding.h189
-rw-r--r--ydb/core/grpc_services/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux.txt1
-rw-r--r--ydb/core/grpc_services/rpc_long_tx.cpp27
-rw-r--r--ydb/core/kqp/common/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/kqp/common/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/common/CMakeLists.linux.txt1
-rw-r--r--ydb/core/kqp/common/kqp_resolve.h24
-rw-r--r--ydb/core/kqp/executer_actor/kqp_partition_helper.cpp42
-rw-r--r--ydb/core/kqp/executer_actor/kqp_table_resolver.cpp74
-rw-r--r--ydb/core/tx/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/tx/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/CMakeLists.linux.txt1
-rw-r--r--ydb/core/tx/sharding/CMakeLists.darwin.txt28
-rw-r--r--ydb/core/tx/sharding/CMakeLists.linux-aarch64.txt29
-rw-r--r--ydb/core/tx/sharding/CMakeLists.linux.txt29
-rw-r--r--ydb/core/tx/sharding/CMakeLists.txt15
-rw-r--r--ydb/core/tx/sharding/hash.cpp5
-rw-r--r--ydb/core/tx/sharding/hash.h13
-rw-r--r--ydb/core/tx/sharding/sharding.cpp178
-rw-r--r--ydb/core/tx/sharding/sharding.h143
-rw-r--r--ydb/core/tx/sharding/ut/CMakeLists.darwin.txt82
-rw-r--r--ydb/core/tx/sharding/ut/CMakeLists.linux-aarch64.txt84
-rw-r--r--ydb/core/tx/sharding/ut/CMakeLists.linux.txt86
-rw-r--r--ydb/core/tx/sharding/ut/CMakeLists.txt15
-rw-r--r--ydb/core/tx/sharding/ut/ut_sharding.cpp32
-rw-r--r--ydb/core/tx/sharding/xx_hash.cpp17
-rw-r--r--ydb/core/tx/sharding/xx_hash.h25
-rw-r--r--ydb/services/ydb/ydb_long_tx_ut.cpp6
30 files changed, 911 insertions, 241 deletions
diff --git a/ydb/core/formats/sharding.h b/ydb/core/formats/sharding.h
deleted file mode 100644
index ea9b6c97b2f..00000000000
--- a/ydb/core/formats/sharding.h
+++ /dev/null
@@ -1,189 +0,0 @@
-#pragma once
-#include "arrow_helpers.h"
-
-#include <contrib/libs/xxhash/xxhash.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h>
-
-#include <type_traits>
-
-namespace NKikimr::NArrow {
-
-class TShardingBase {
-protected:
- static void AppendField(const std::shared_ptr<arrow::Array>& array, int row, std::string& concat) {
- NArrow::SwitchType(array->type_id(), [&](const auto& type) {
- using TWrap = std::decay_t<decltype(type)>;
- using T = typename TWrap::T;
- using TArray = typename arrow::TypeTraits<T>::ArrayType;
-
- if (!array->IsNull(row)) {
- auto& typedArray = static_cast<const TArray&>(*array);
- auto value = typedArray.GetView(row);
- if constexpr (arrow::has_string_view<T>()) {
- concat.append(value.data(), value.size());
- } else if constexpr (arrow::has_c_type<T>()) {
- if constexpr (arrow::is_physical_integer_type<T>()) {
- concat.append(reinterpret_cast<const char*>(&value), sizeof(value));
- } else {
- // Do not use bool or floats for sharding
- static_assert(arrow::is_boolean_type<T>() || arrow::is_floating_type<T>());
- }
- } else {
- static_assert(arrow::is_decimal_type<T>());
- }
- }
- return true;
- });
- }
-};
-
-class THashSharding : public TShardingBase {
-public:
- THashSharding(ui32 shardsCount, ui64 seed = 0)
- : ShardsCount(shardsCount)
- , Seed(seed)
- {}
-
- std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch,
- const TVector<TString>& shardingColumns) const
- {
- std::vector<std::shared_ptr<arrow::Array>> columns;
- columns.reserve(shardingColumns.size());
-
- for (auto& colName : shardingColumns) {
- auto array = batch->GetColumnByName(colName);
- if (!array) {
- return {};
- }
- columns.emplace_back(array);
- }
-
- std::vector<ui32> out(batch->num_rows());
-
- if (columns.size() == 1) {
- auto& array = columns[0];
- SwitchType(array->type_id(), [&](const auto& type) {
- using TWrap = std::decay_t<decltype(type)>;
- using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
-
- auto& column = static_cast<const TArray&>(*array);
-
- for (int row = 0; row < batch->num_rows(); ++row) {
- out[row] = ShardNo(column.GetView(row));
- }
- return true;
- });
- } else {
- std::string concat;
- for (int row = 0; row < batch->num_rows(); ++row) {
- concat.clear();
- for (auto& column : columns) {
- AppendField(column, row, concat);
- }
-
- out[row] = ShardNo(concat);
- }
- }
-
- return out;
- }
-
- template <typename T, std::enable_if_t<std::is_arithmetic<T>::value, bool> = true>
- ui32 ShardNo(T value) const {
- return XXH64(&value, sizeof(value), Seed) % ShardsCount;
- }
-
-private:
- ui32 ShardsCount;
- ui64 Seed;
-
- ui32 ShardNo(arrow::util::string_view value) const {
- return XXH64(value.data(), value.size(), Seed) % ShardsCount;
- }
-};
-
-// KIKIMR-11529
-class TLogsSharding : public TShardingBase {
-public:
- static constexpr ui32 DEFAULT_ACITVE_SHARDS = 10;
- static constexpr TDuration DEFAULT_CHANGE_PERIOD = TDuration::Minutes(5);
-
- // TODO
- TLogsSharding(ui32 shardsCountTotal, ui32 shardsCountActive = DEFAULT_ACITVE_SHARDS, TDuration changePeriod = DEFAULT_CHANGE_PERIOD)
- : ShardsCount(shardsCountTotal)
- , NumActive(Min<ui32>(shardsCountActive, ShardsCount))
- , TsMin(0)
- , ChangePeriod(changePeriod.MicroSeconds())
- {}
-
- // tsMin = GetTsMin(tabletIdsMap, timestamp);
- // tabletIds = GetTableIdsByTs(tabletIdsMap, timestamp);
- // numIntervals = tabletIds.size() / nActive;
- // tsInterval = (timestamp - tsMin) / changePeriod;
- // shardNo = (hash(uid) % nActive) + (tsInterval % numIntervals) * nActive;
- // tabletId = tabletIds[shardNo];
- ui32 ShardNo(ui64 timestamp, arrow::util::string_view uid) const {
- ui64 uidHash = XXH64(uid.data(), uid.size(), 0);
- ui32 tsInterval = (timestamp - TsMin) / ChangePeriod;
- ui32 numIntervals = ShardsCount / NumActive;
- return ((uidHash % NumActive) + (tsInterval % numIntervals) * NumActive) % ShardsCount;
- }
-
- std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch,
- const TVector<TString>& shardingColumns) const
- {
- if (shardingColumns.size() < 2) {
- return {};
- }
-
- auto tsArray = batch->GetColumnByName(shardingColumns[0]);
- if (!tsArray || tsArray->type_id() != arrow::Type::TIMESTAMP) {
- return {};
- }
-
- std::vector<std::shared_ptr<arrow::Array>> extraColumns;
- extraColumns.reserve(shardingColumns.size() - 1);
-
- for (size_t i = 1; i < shardingColumns.size(); ++i) {
- auto array = batch->GetColumnByName(shardingColumns[i]);
- if (!array) {
- return {};
- }
- extraColumns.emplace_back(array);
- }
-
- auto tsColumn = std::static_pointer_cast<arrow::TimestampArray>(tsArray);
- std::vector<ui32> out;
- out.reserve(batch->num_rows());
-
- if (extraColumns.size() == 1 && extraColumns[0]->type_id() == arrow::Type::STRING) {
- auto column = std::static_pointer_cast<arrow::StringArray>(extraColumns[0]);
-
- for (int row = 0; row < batch->num_rows(); ++row) {
- ui32 shardNo = ShardNo(tsColumn->Value(row), column->GetView(row));
- out.emplace_back(shardNo);
- }
- } else {
- std::string concat;
- for (int row = 0; row < batch->num_rows(); ++row) {
- concat.clear();
- for (auto& column : extraColumns) {
- AppendField(column, row, concat);
- }
-
- ui32 shardNo = ShardNo(tsColumn->Value(row), concat);
- out.emplace_back(shardNo);
- }
- }
-
- return out;
- }
-
-private:
- ui32 ShardsCount;
- ui32 NumActive;
- ui64 TsMin;
- ui64 ChangePeriod;
-};
-
-}
diff --git a/ydb/core/grpc_services/CMakeLists.darwin.txt b/ydb/core/grpc_services/CMakeLists.darwin.txt
index 3dc6b63a011..7b1e0c5eedb 100644
--- a/ydb/core/grpc_services/CMakeLists.darwin.txt
+++ b/ydb/core/grpc_services/CMakeLists.darwin.txt
@@ -39,6 +39,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
ydb-core-sys_view
ydb-core-tx
core-tx-datashard
+ core-tx-sharding
tx-long_tx_service-public
ydb-core-ydb_convert
yq-libs-actors
diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
index bb6c3dc35af..76a073859fb 100644
--- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
@@ -40,6 +40,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
ydb-core-sys_view
ydb-core-tx
core-tx-datashard
+ core-tx-sharding
tx-long_tx_service-public
ydb-core-ydb_convert
yq-libs-actors
diff --git a/ydb/core/grpc_services/CMakeLists.linux.txt b/ydb/core/grpc_services/CMakeLists.linux.txt
index bb6c3dc35af..76a073859fb 100644
--- a/ydb/core/grpc_services/CMakeLists.linux.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux.txt
@@ -40,6 +40,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
ydb-core-sys_view
ydb-core-tx
core-tx-datashard
+ core-tx-sharding
tx-long_tx_service-public
ydb-core-ydb_convert
yq-libs-actors
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp
index 5e548fbc8ca..a19151fa2c9 100644
--- a/ydb/core/grpc_services/rpc_long_tx.cpp
+++ b/ydb/core/grpc_services/rpc_long_tx.cpp
@@ -9,7 +9,7 @@
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/tablet/tablet_pipe_client_cache.h>
#include <ydb/core/formats/arrow_helpers.h>
-#include <ydb/core/formats/sharding.h>
+#include <ydb/core/tx/sharding/sharding.h>
#include <ydb/core/scheme/scheme_types_proto.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/columnshard/columnshard.h>
@@ -97,10 +97,8 @@ TFullSplitData SplitData(const std::shared_ptr<arrow::RecordBatch>& batch,
Y_VERIFY(description.HasSharding() && description.GetSharding().HasHashSharding());
auto& descSharding = description.GetSharding();
- auto& hashSharding = descSharding.GetHashSharding();
TVector<ui64> tabletIds(descSharding.GetColumnShards().begin(), descSharding.GetColumnShards().end());
- TVector<TString> shardingColumns(hashSharding.GetColumns().begin(), hashSharding.GetColumns().end());
ui32 numShards = tabletIds.size();
Y_VERIFY(numShards);
TFullSplitData result(numShards);
@@ -111,28 +109,15 @@ TFullSplitData SplitData(const std::shared_ptr<arrow::RecordBatch>& batch,
return result;
}
+ auto sharding = NSharding::TShardingBase::BuildShardingOperator(descSharding);
std::vector<ui32> rowSharding;
- if (hashSharding.GetFunction() == NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_MODULO_N) {
- NArrow::THashSharding sharding(numShards);
- rowSharding = sharding.MakeSharding(batch, shardingColumns);
- } else if (hashSharding.GetFunction() == NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CLOUD_LOGS) {
- ui32 activeShards = NArrow::TLogsSharding::DEFAULT_ACITVE_SHARDS;
- if (hashSharding.HasActiveShardsCount()) {
- activeShards = hashSharding.GetActiveShardsCount();
- }
- NArrow::TLogsSharding sharding(numShards, activeShards);
- rowSharding = sharding.MakeSharding(batch, shardingColumns);
+ if (sharding) {
+ rowSharding = sharding->MakeSharding(batch);
}
-
if (rowSharding.empty()) {
result.ErrorString = "empty "
- + NKikimrSchemeOp::TColumnTableSharding::THashSharding::EHashFunction_Name(hashSharding.GetFunction())
- + " sharding";
- for (auto& column : shardingColumns) {
- if (batch->schema()->GetFieldIndex(column) < 0) {
- result.ErrorString += ", no column '" + column + "'";
- }
- }
+ + NKikimrSchemeOp::TColumnTableSharding::THashSharding::EHashFunction_Name(descSharding.GetHashSharding().GetFunction())
+ + " sharding (" + (sharding ? sharding->DebugString() : "no sharding object") + ")";
return result;
}
diff --git a/ydb/core/kqp/common/CMakeLists.darwin.txt b/ydb/core/kqp/common/CMakeLists.darwin.txt
index e8d477e1826..ace4ec9165e 100644
--- a/ydb/core/kqp/common/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/common/CMakeLists.darwin.txt
@@ -19,6 +19,7 @@ target_link_libraries(core-kqp-common PUBLIC
core-kqp-expr_nodes
core-kqp-provider
tx-long_tx_service-public
+ core-tx-sharding
yql-dq-expr_nodes
ydb-library-aclib
yql-core-issue
diff --git a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
index f1f616100c8..81a8be2dc08 100644
--- a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
@@ -20,6 +20,7 @@ target_link_libraries(core-kqp-common PUBLIC
core-kqp-expr_nodes
core-kqp-provider
tx-long_tx_service-public
+ core-tx-sharding
yql-dq-expr_nodes
ydb-library-aclib
yql-core-issue
diff --git a/ydb/core/kqp/common/CMakeLists.linux.txt b/ydb/core/kqp/common/CMakeLists.linux.txt
index f1f616100c8..81a8be2dc08 100644
--- a/ydb/core/kqp/common/CMakeLists.linux.txt
+++ b/ydb/core/kqp/common/CMakeLists.linux.txt
@@ -20,6 +20,7 @@ target_link_libraries(core-kqp-common PUBLIC
core-kqp-expr_nodes
core-kqp-provider
tx-long_tx_service-public
+ core-tx-sharding
yql-dq-expr_nodes
ydb-library-aclib
yql-core-issue
diff --git a/ydb/core/kqp/common/kqp_resolve.h b/ydb/core/kqp/common/kqp_resolve.h
index 542e9e03384..ffc5eb7f577 100644
--- a/ydb/core/kqp/common/kqp_resolve.h
+++ b/ydb/core/kqp/common/kqp_resolve.h
@@ -1,9 +1,11 @@
#pragma once
#include <ydb/core/engine/mkql_keys.h>
+#include <ydb/core/tx/sharding/sharding.h>
#include <ydb/core/kqp/expr_nodes/kqp_expr_nodes.h>
#include <ydb/core/protos/kqp_physical.pb.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/library/yql/minikql/mkql_node.h>
@@ -21,18 +23,30 @@ enum class ETableKind {
class TKqpTableKeys {
public:
- struct TColumn {
- ui32 Id;
- NScheme::TTypeInfo Type;
- TString TypeMod;
- };
+ using TColumn = NSharding::TShardingBase::TColumn;
struct TTable {
+ public:
TString Path;
TMap<TString, TColumn> Columns;
TVector<TString> KeyColumns;
TVector<NScheme::TTypeInfo> KeyColumnTypes;
ETableKind TableKind = ETableKind::Unknown;
+ TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TColumnTableInfo> ColumnTableInfo;
+
+ const TMap<TString, TColumn>& GetColumnsRemap() const {
+ return Columns;
+ }
+
+ std::unique_ptr<NSharding::TShardingBase> BuildSharding() const {
+ if (ColumnTableInfo) {
+ auto result = NSharding::TShardingBase::BuildShardingOperator(ColumnTableInfo->Description.GetSharding());
+ YQL_ENSURE(result);
+ return result;
+ } else {
+ return nullptr;
+ }
+ }
};
TTable* FindTablePtr(const TTableId& id) {
diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
index 53a74080590..d609608dd45 100644
--- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
@@ -57,18 +57,36 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey(
}
NUdf::TUnboxedValue paramValue;
+ std::unique_ptr<NSharding::TShardingBase> sharding = table.BuildSharding();
+ std::unique_ptr<NSharding::TUnboxedValueReader> unboxedReader;
+ if (sharding) {
+ unboxedReader = std::make_unique<NSharding::TUnboxedValueReader>(structType, table.GetColumnsRemap(), sharding->GetShardingColumns());
+ }
auto it = value.GetListIterator();
while (it.Next(paramValue)) {
- auto keyValue = MakeKeyCells(paramValue, table.KeyColumnTypes, keyColumnIndices,
- typeEnv, /* copyValues */ true);
- Y_VERIFY_DEBUG(keyValue.size() == keyLen);
-
- ui32 partitionIndex = FindKeyPartitionIndex(keyValue, key.GetPartitions(), table.KeyColumnTypes,
- [] (const auto& partition) { return *partition.Range; });
+ ui64 shardId = 0;
+ if (sharding) {
+ shardId = key.GetPartitions()[sharding->CalcShardId(paramValue, *unboxedReader)].ShardId;
+ } else {
+ auto keyValue = MakeKeyCells(paramValue, table.KeyColumnTypes, keyColumnIndices,
+ typeEnv, /* copyValues */ true);
+ Y_VERIFY_DEBUG(keyValue.size() == keyLen);
+ const ui32 partitionIndex = FindKeyPartitionIndex(keyValue, key.GetPartitions(), table.KeyColumnTypes,
+ [](const auto& partition) { return *partition.Range; });
- auto point = TSerializedCellVec(TSerializedCellVec::Serialize(keyValue));
+ shardId = key.GetPartitions()[partitionIndex].ShardId;
- ui64 shardId = key.GetPartitions()[partitionIndex].ShardId;
+ auto point = TSerializedCellVec(TSerializedCellVec::Serialize(keyValue));
+ auto& shardData = ret[shardId];
+ if (key.GetPartitions()[partitionIndex].Range->IsPoint) {
+ // singular case when partition is just a point
+ shardData.FullRange.emplace(TSerializedTableRange(point.GetBuffer(), "", true, true));
+ shardData.FullRange->Point = true;
+ shardData.Ranges.clear();
+ } else {
+ shardData.Ranges.emplace_back(std::move(point));
+ }
+ }
auto& shardData = ret[shardId];
for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
@@ -87,14 +105,6 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey(
shardParamValues[shardId].emplace_back(std::move(paramValue));
- if (key.GetPartitions()[partitionIndex].Range->IsPoint) {
- // singular case when partition is just a point
- shardData.FullRange.emplace(TSerializedTableRange(point.GetBuffer(), "", true, true));
- shardData.FullRange->Point = true;
- shardData.Ranges.clear();
- } else {
- shardData.Ranges.emplace_back(std::move(point));
- }
shardData.ParamType = itemType;
}
diff --git a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp
index 6cdcf4bd38b..f9dabdf25cf 100644
--- a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp
@@ -159,6 +159,7 @@ private:
STATEFN(ResolveKeysState) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, HandleResolveKeys);
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleResolveKeys);
hFunc(TEvents::TEvPoison, HandleResolveKeys);
default: {
LOG_C("ResolveKeysState: unexpected event " << ev->GetTypeRewrite());
@@ -167,6 +168,43 @@ private:
}
}
+ void HandleResolveKeys(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ if (ShouldTerminate) {
+ PassAway();
+ return;
+ }
+ auto& results = ev->Get()->Request->ResultSet;
+ if (results.size() != TableRequestIds.size()) {
+ ReplyErrorAndDie(Ydb::StatusIds::INTERNAL_ERROR, TIssue(TStringBuilder() << "navigation problems for tables"));
+ return;
+ }
+ LOG_D("Navigated key sets: " << results.size());
+ for (auto& entry : results) {
+ if (!TableRequestIds.erase(entry.TableId)) {
+ ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR,
+ YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder()
+ << "Incorrect tableId in reply " << entry.TableId << '.'));
+ return;
+ }
+ if (entry.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
+ ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR,
+ YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder()
+ << "Failed to resolve table " << entry.TableId << " keys: " << entry.Status << '.'));
+ return;
+ }
+ auto* table = TableKeys.FindTablePtr(entry.TableId);
+ if (!table) {
+ ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR,
+ YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder()
+ << "Incorrect tableId in table keys " << entry.TableId << '.'));
+ return;
+ }
+ table->ColumnTableInfo = entry.ColumnTableInfo;
+ }
+ NavigationFinished = true;
+ TryFinish();
+ }
+
void HandleResolveKeys(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr &ev) {
if (ShouldTerminate) {
PassAway();
@@ -215,12 +253,8 @@ private:
}
timer.reset();
-
- auto replyEv = std::make_unique<TEvKqpExecuter::TEvTableResolveStatus>();
- replyEv->CpuTime = CpuTime;
-
- Send(Owner, replyEv.release());
- PassAway();
+ ResolvingFinished = true;
+ TryFinish();
}
void HandleResolveKeys(TEvents::TEvPoison::TPtr&) {
@@ -231,6 +265,7 @@ private:
void ResolveKeys() {
FillKqpTasksGraphStages(TasksGraph, Transactions);
+ auto requestNavigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
request->ResultSet.reserve(TasksGraph.GetStagesInfo().size());
if (UserToken) {
@@ -249,6 +284,13 @@ private:
stageInfo.Meta.ShardKey = ExtractKey(stageInfo.Meta.TableId, operation);
+ if (stageInfo.Meta.TableKind == ETableKind::Olap && TableRequestIds.emplace(stageInfo.Meta.TableId).second) {
+ auto& entry = requestNavigate->ResultSet.emplace_back();
+ entry.TableId = stageInfo.Meta.TableId;
+ entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpTable;
+ }
+
auto& entry = request->ResultSet.emplace_back(std::move(stageInfo.Meta.ShardKey));
entry.UserData = EncodeStageInfo(stageInfo);
switch (operation) {
@@ -266,7 +308,11 @@ private:
}
}
}
-
+ if (requestNavigate->ResultSet.size()) {
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(requestNavigate.release()));
+ } else {
+ NavigationFinished = true;
+ }
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
}
@@ -310,12 +356,26 @@ private:
PassAway();
}
+ void TryFinish() {
+ if (!NavigationFinished || !ResolvingFinished) {
+ return;
+ }
+ auto replyEv = std::make_unique<TEvKqpExecuter::TEvTableResolveStatus>();
+ replyEv->CpuTime = CpuTime;
+
+ Send(Owner, replyEv.release());
+ PassAway();
+ }
+
private:
const TActorId Owner;
const ui64 TxId;
const TMaybe<TString> UserToken;
const TVector<IKqpGateway::TPhysicalTxData>& Transactions;
TKqpTableKeys& TableKeys;
+ THashSet<TTableId> TableRequestIds;
+ bool NavigationFinished = false;
+ bool ResolvingFinished = false;
// TODO: TableResolver should not populate TasksGraph as it's not related to its job (bad API).
TKqpTasksGraph& TasksGraph;
diff --git a/ydb/core/tx/CMakeLists.darwin.txt b/ydb/core/tx/CMakeLists.darwin.txt
index ceb4de0b6fc..e4fe0eea1fc 100644
--- a/ydb/core/tx/CMakeLists.darwin.txt
+++ b/ydb/core/tx/CMakeLists.darwin.txt
@@ -18,6 +18,7 @@ add_subdirectory(scheme_cache)
add_subdirectory(schemeshard)
add_subdirectory(sequenceproxy)
add_subdirectory(sequenceshard)
+add_subdirectory(sharding)
add_subdirectory(tiering)
add_subdirectory(time_cast)
add_subdirectory(tx_allocator)
diff --git a/ydb/core/tx/CMakeLists.linux-aarch64.txt b/ydb/core/tx/CMakeLists.linux-aarch64.txt
index 8f7572568bf..f2f8b8e3e5a 100644
--- a/ydb/core/tx/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/CMakeLists.linux-aarch64.txt
@@ -18,6 +18,7 @@ add_subdirectory(scheme_cache)
add_subdirectory(schemeshard)
add_subdirectory(sequenceproxy)
add_subdirectory(sequenceshard)
+add_subdirectory(sharding)
add_subdirectory(tiering)
add_subdirectory(time_cast)
add_subdirectory(tx_allocator)
diff --git a/ydb/core/tx/CMakeLists.linux.txt b/ydb/core/tx/CMakeLists.linux.txt
index 8f7572568bf..f2f8b8e3e5a 100644
--- a/ydb/core/tx/CMakeLists.linux.txt
+++ b/ydb/core/tx/CMakeLists.linux.txt
@@ -18,6 +18,7 @@ add_subdirectory(scheme_cache)
add_subdirectory(schemeshard)
add_subdirectory(sequenceproxy)
add_subdirectory(sequenceshard)
+add_subdirectory(sharding)
add_subdirectory(tiering)
add_subdirectory(time_cast)
add_subdirectory(tx_allocator)
diff --git a/ydb/core/tx/sharding/CMakeLists.darwin.txt b/ydb/core/tx/sharding/CMakeLists.darwin.txt
new file mode 100644
index 00000000000..cab50904fd5
--- /dev/null
+++ b/ydb/core/tx/sharding/CMakeLists.darwin.txt
@@ -0,0 +1,28 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(core-tx-sharding)
+target_compile_options(core-tx-sharding PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-tx-sharding PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-yql-minikql
+ library-yql-utils
+ yql-public-udf
+ ydb-core-formats
+ ydb-core-protos
+)
+target_sources(core-tx-sharding PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/sharding.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/hash.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/xx_hash.cpp
+)
diff --git a/ydb/core/tx/sharding/CMakeLists.linux-aarch64.txt b/ydb/core/tx/sharding/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..5e702794935
--- /dev/null
+++ b/ydb/core/tx/sharding/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,29 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(core-tx-sharding)
+target_compile_options(core-tx-sharding PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-tx-sharding PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-yql-minikql
+ library-yql-utils
+ yql-public-udf
+ ydb-core-formats
+ ydb-core-protos
+)
+target_sources(core-tx-sharding PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/sharding.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/hash.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/xx_hash.cpp
+)
diff --git a/ydb/core/tx/sharding/CMakeLists.linux.txt b/ydb/core/tx/sharding/CMakeLists.linux.txt
new file mode 100644
index 00000000000..5e702794935
--- /dev/null
+++ b/ydb/core/tx/sharding/CMakeLists.linux.txt
@@ -0,0 +1,29 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(core-tx-sharding)
+target_compile_options(core-tx-sharding PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-tx-sharding PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-yql-minikql
+ library-yql-utils
+ yql-public-udf
+ ydb-core-formats
+ ydb-core-protos
+)
+target_sources(core-tx-sharding PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/sharding.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/hash.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/xx_hash.cpp
+)
diff --git a/ydb/core/tx/sharding/CMakeLists.txt b/ydb/core/tx/sharding/CMakeLists.txt
new file mode 100644
index 00000000000..5bb4faffb40
--- /dev/null
+++ b/ydb/core/tx/sharding/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/core/tx/sharding/hash.cpp b/ydb/core/tx/sharding/hash.cpp
new file mode 100644
index 00000000000..b770619203a
--- /dev/null
+++ b/ydb/core/tx/sharding/hash.cpp
@@ -0,0 +1,5 @@
+#include "hash.h"
+
+namespace NKikimr::NSharding {
+
+}
diff --git a/ydb/core/tx/sharding/hash.h b/ydb/core/tx/sharding/hash.h
new file mode 100644
index 00000000000..0f98c6f1602
--- /dev/null
+++ b/ydb/core/tx/sharding/hash.h
@@ -0,0 +1,13 @@
+#pragma once
+#include <util/system/types.h>
+
+namespace NKikimr::NSharding {
+
+class IHashCalcer {
+public:
+ virtual void Update(const ui8* data, const ui32 size) = 0;
+ virtual ui64 Finish() = 0;
+ virtual void Start() = 0;
+};
+
+}
diff --git a/ydb/core/tx/sharding/sharding.cpp b/ydb/core/tx/sharding/sharding.cpp
new file mode 100644
index 00000000000..a3f006db34c
--- /dev/null
+++ b/ydb/core/tx/sharding/sharding.cpp
@@ -0,0 +1,178 @@
+#include "sharding.h"
+#include "xx_hash.h"
+#include <ydb/library/yql/utils/yql_panic.h>
+#include <ydb/library/yql/minikql/mkql_node.h>
+#include <util/string/join.h>
+
+namespace NKikimr::NSharding {
+
+void TShardingBase::AppendField(const std::shared_ptr<arrow::Array>& array, int row, IHashCalcer& hashCalcer) {
+ NArrow::SwitchType(array->type_id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ using T = typename TWrap::T;
+ using TArray = typename arrow::TypeTraits<T>::ArrayType;
+
+ if (!array->IsNull(row)) {
+ auto& typedArray = static_cast<const TArray&>(*array);
+ auto value = typedArray.GetView(row);
+ if constexpr (arrow::has_string_view<T>()) {
+ hashCalcer.Update((const ui8*)value.data(), value.size());
+ } else if constexpr (arrow::has_c_type<T>()) {
+ if constexpr (arrow::is_physical_integer_type<T>()) {
+ hashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
+ } else {
+ // Do not use bool or floats for sharding
+ static_assert(arrow::is_boolean_type<T>() || arrow::is_floating_type<T>());
+ }
+ } else {
+ static_assert(arrow::is_decimal_type<T>());
+ }
+ }
+ return true;
+ });
+}
+
+void TUnboxedValueReader::BuildStringForHash(const NKikimr::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) const {
+ for (auto&& i : ColumnsInfo) {
+ auto columnValue = value.GetElement(i.Idx);
+ if (columnValue.IsString()) {
+ hashCalcer.Update((const ui8*)columnValue.AsStringRef().Data(), columnValue.AsStringRef().Size());
+ } else if (columnValue.IsEmbedded()) {
+ switch (i.Type.GetTypeId()) {
+ case NScheme::NTypeIds::Uint16:
+ FieldToHashString<ui16>(columnValue, hashCalcer);
+ continue;
+ case NScheme::NTypeIds::Uint32:
+ FieldToHashString<ui32>(columnValue, hashCalcer);
+ continue;
+ case NScheme::NTypeIds::Uint64:
+ FieldToHashString<ui64>(columnValue, hashCalcer);
+ continue;
+ case NScheme::NTypeIds::Int16:
+ FieldToHashString<i16>(columnValue, hashCalcer);
+ continue;
+ case NScheme::NTypeIds::Int32:
+ FieldToHashString<i32>(columnValue, hashCalcer);
+ continue;
+ case NScheme::NTypeIds::Int64:
+ FieldToHashString<i64>(columnValue, hashCalcer);
+ continue;
+ }
+ YQL_ENSURE(false, "incorrect column type for shard calculation");
+ } else {
+ YQL_ENSURE(false, "incorrect column type for shard calculation");
+ }
+ }
+}
+
+TUnboxedValueReader::TUnboxedValueReader(const NMiniKQL::TStructType* structInfo,
+ const TMap<TString, TExternalTableColumn>& columnsRemap, const std::vector<TString>& shardingColumns) {
+ YQL_ENSURE(shardingColumns.size());
+ for (auto&& i : shardingColumns) {
+ auto it = columnsRemap.find(i);
+ YQL_ENSURE(it != columnsRemap.end());
+ ColumnsInfo.emplace_back(TColumnUnboxedPlaceInfo(it->second, structInfo->GetMemberIndex(i), i));
+ }
+}
+
+std::unique_ptr<TShardingBase> TShardingBase::BuildShardingOperator(const NKikimrSchemeOp::TColumnTableSharding& sharding) {
+ if (sharding.HasHashSharding()) {
+ auto& hashSharding = sharding.GetHashSharding();
+ std::vector<TString> columnNames(hashSharding.GetColumns().begin(), hashSharding.GetColumns().end());
+ if (hashSharding.GetFunction() == NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_MODULO_N) {
+ return std::make_unique<THashSharding>(sharding.GetColumnShards().size(), columnNames, 0);
+ } else if (hashSharding.GetFunction() == NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CLOUD_LOGS) {
+ ui32 activeShards = TLogsSharding::DEFAULT_ACITVE_SHARDS;
+ if (hashSharding.HasActiveShardsCount()) {
+ activeShards = hashSharding.GetActiveShardsCount();
+ }
+ return std::make_unique<TLogsSharding>(sharding.GetColumnShards().size(), columnNames, activeShards);
+ }
+ }
+ return nullptr;
+}
+
+TString TShardingBase::DebugString() const {
+ return "Columns: " + JoinSeq(", ", GetShardingColumns());
+}
+
+
+std::vector<ui32> THashSharding::MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ columns.reserve(ShardingColumns.size());
+
+ for (auto& colName : ShardingColumns) {
+ auto array = batch->GetColumnByName(colName);
+ if (!array) {
+ return {};
+ }
+ columns.emplace_back(array);
+ }
+
+ std::vector<ui32> out(batch->num_rows());
+
+ TStreamStringHashCalcer hashCalcer(Seed);
+
+ for (int row = 0; row < batch->num_rows(); ++row) {
+ hashCalcer.Start();
+ for (auto& column : columns) {
+ AppendField(column, row, hashCalcer);
+ }
+ out[row] = hashCalcer.Finish() % ShardsCount;
+ }
+
+ return out;
+}
+
+ui32 THashSharding::CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const {
+ TStreamStringHashCalcer hashCalcer(Seed);
+ hashCalcer.Start();
+ readerInfo.BuildStringForHash(value, hashCalcer);
+ return hashCalcer.Finish() % ShardsCount;
+}
+
+std::vector<ui32> TLogsSharding::MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ if (ShardingColumns.size() < 2) {
+ return {};
+ }
+
+ auto tsArray = batch->GetColumnByName(ShardingColumns[0]);
+ if (!tsArray || tsArray->type_id() != arrow::Type::TIMESTAMP) {
+ return {};
+ }
+
+ std::vector<std::shared_ptr<arrow::Array>> extraColumns;
+ extraColumns.reserve(ShardingColumns.size() - 1);
+
+ for (size_t i = 1; i < ShardingColumns.size(); ++i) {
+ auto array = batch->GetColumnByName(ShardingColumns[i]);
+ if (!array) {
+ return {};
+ }
+ extraColumns.emplace_back(array);
+ }
+
+ auto tsColumn = std::static_pointer_cast<arrow::TimestampArray>(tsArray);
+ std::vector<ui32> out;
+ out.reserve(batch->num_rows());
+
+ TStreamStringHashCalcer hashCalcer(0);
+ for (int row = 0; row < batch->num_rows(); ++row) {
+ hashCalcer.Start();
+ for (auto& column : extraColumns) {
+ AppendField(column, row, hashCalcer);
+ }
+
+ const ui32 shardNo = ShardNo(tsColumn->Value(row), hashCalcer.Finish());
+ out.emplace_back(shardNo);
+ }
+
+ return out;
+}
+
+ui32 TLogsSharding::CalcShardId(const NKikimr::NUdf::TUnboxedValue& /*value*/, const TUnboxedValueReader& /*readerInfo*/) const {
+ YQL_ENSURE(false);
+ return 0;
+}
+
+}
diff --git a/ydb/core/tx/sharding/sharding.h b/ydb/core/tx/sharding/sharding.h
new file mode 100644
index 00000000000..3d0e0350cca
--- /dev/null
+++ b/ydb/core/tx/sharding/sharding.h
@@ -0,0 +1,143 @@
+#pragma once
+
+#include "hash.h"
+
+#include <ydb/core/formats/arrow_helpers.h>
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+
+#include <ydb/library/yql/public/udf/udf_value.h>
+#include <ydb/library/accessor/accessor.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h>
+#include <contrib/libs/xxhash/xxhash.h>
+
+#include <type_traits>
+
+namespace NKikimr::NMiniKQL {
+class TStructType;
+}
+
+namespace NKikimr::NSharding {
+
+struct TExternalTableColumn {
+ ui32 Id;
+ NScheme::TTypeInfo Type;
+ TString TypeMod;
+};
+
+struct TColumnUnboxedPlaceInfo: public TExternalTableColumn {
+private:
+ using TBase = TExternalTableColumn;
+public:
+ const ui32 Idx;
+ const TString Name;
+
+ TColumnUnboxedPlaceInfo(const TExternalTableColumn& baseInfo, const ui32 idx, const TString& name)
+ : TBase(baseInfo)
+ , Idx(idx)
+ , Name(name) {
+
+ }
+};
+
+class TUnboxedValueReader {
+private:
+ YDB_READONLY_DEF(std::vector<TColumnUnboxedPlaceInfo>, ColumnsInfo);
+ template <class T>
+ static void FieldToHashString(const NKikimr::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) {
+ static_assert(std::is_arithmetic<T>::value);
+ const T result = value.Get<T>();
+ hashCalcer.Update((const ui8*)&result, sizeof(result));
+ }
+public:
+ void BuildStringForHash(const NKikimr::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) const;
+ TUnboxedValueReader(const NMiniKQL::TStructType* structInfo, const TMap<TString, TExternalTableColumn>& columnsRemap, const std::vector<TString>& shardingColumns);
+};
+
+class TShardingBase {
+public:
+ using TColumn = TExternalTableColumn;
+
+protected:
+ static void AppendField(const std::shared_ptr<arrow::Array>& array, int row, IHashCalcer& hashCalcer);
+public:
+ static std::unique_ptr<TShardingBase> BuildShardingOperator(const NKikimrSchemeOp::TColumnTableSharding& shardingInfo);
+
+ virtual const std::vector<TString>& GetShardingColumns() const = 0;
+ virtual ui32 CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const = 0;
+ virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0;
+
+ virtual TString DebugString() const;
+
+ virtual ~TShardingBase() = default;
+};
+
+class THashSharding : public TShardingBase {
+private:
+ ui32 ShardsCount;
+ ui64 Seed;
+ std::vector<TString> ShardingColumns;
+public:
+ THashSharding(ui32 shardsCount, const std::vector<TString>& columnNames, ui64 seed = 0)
+ : ShardsCount(shardsCount)
+ , Seed(seed)
+ , ShardingColumns(columnNames)
+ {}
+
+ virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
+
+ template <typename T>
+ static ui32 ShardNo(const T value, const ui32 shardsCount, const ui32 seed = 0) {
+ Y_ASSERT(shardsCount);
+ static_assert(std::is_arithmetic<T>::value);
+ return XXH64(&value, sizeof(value), seed) % shardsCount;
+ }
+ virtual const std::vector<TString>& GetShardingColumns() const override {
+ return ShardingColumns;
+ }
+
+ virtual ui32 CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override;
+};
+
+// KIKIMR-11529
+class TLogsSharding : public TShardingBase {
+private:
+ ui32 ShardsCount;
+ ui32 NumActive;
+ ui64 TsMin;
+ ui64 ChangePeriod;
+ const std::vector<TString> ShardingColumns;
+public:
+ static constexpr ui32 DEFAULT_ACITVE_SHARDS = 10;
+ static constexpr TDuration DEFAULT_CHANGE_PERIOD = TDuration::Minutes(5);
+
+ TLogsSharding(ui32 shardsCountTotal, const std::vector<TString>& columnNames, ui32 shardsCountActive, TDuration changePeriod = DEFAULT_CHANGE_PERIOD)
+ : ShardsCount(shardsCountTotal)
+ , NumActive(Min<ui32>(shardsCountActive, ShardsCount))
+ , TsMin(0)
+ , ChangePeriod(changePeriod.MicroSeconds())
+ , ShardingColumns(columnNames)
+ {}
+
+ // tsMin = GetTsMin(tabletIdsMap, timestamp);
+ // tabletIds = GetTableIdsByTs(tabletIdsMap, timestamp);
+ // numIntervals = tabletIds.size() / nActive;
+ // tsInterval = (timestamp - tsMin) / changePeriod;
+ // shardNo = (hash(uid) % nActive) + (tsInterval % numIntervals) * nActive;
+ // tabletId = tabletIds[shardNo];
+ ui32 ShardNo(ui64 timestamp, const ui64 uidHash) const {
+ ui32 tsInterval = (timestamp - TsMin) / ChangePeriod;
+ ui32 numIntervals = ShardsCount / NumActive;
+ return ((uidHash % NumActive) + (tsInterval % numIntervals) * NumActive) % ShardsCount;
+ }
+
+ virtual const std::vector<TString>& GetShardingColumns() const override {
+ return ShardingColumns;
+ }
+ virtual ui32 CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override;
+
+ virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
+
+};
+
+}
diff --git a/ydb/core/tx/sharding/ut/CMakeLists.darwin.txt b/ydb/core/tx/sharding/ut/CMakeLists.darwin.txt
new file mode 100644
index 00000000000..c60bb0dd917
--- /dev/null
+++ b/ydb/core/tx/sharding/ut/CMakeLists.darwin.txt
@@ -0,0 +1,82 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-tx-sharding-ut)
+target_compile_options(ydb-core-tx-sharding-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-tx-sharding-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering
+)
+target_link_libraries(ydb-core-tx-sharding-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-system
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ core-tx-tiering
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ core-testlib-default
+ core-tx-sharding
+ public-lib-yson_value
+)
+target_link_options(ydb-core-tx-sharding-ut PRIVATE
+ -Wl,-no_deduplicate
+ -Wl,-sdk_version,10.15
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-core-tx-sharding-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/ut/ut_sharding.cpp
+)
+set_property(
+ TARGET
+ ydb-core-tx-sharding-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 5
+)
+add_yunittest(
+ NAME
+ ydb-core-tx-sharding-ut
+ TEST_TARGET
+ ydb-core-tx-sharding-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-sharding-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-sharding-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-sharding-ut
+ PROPERTY
+ TIMEOUT
+ 600
+)
+vcs_info(ydb-core-tx-sharding-ut)
diff --git a/ydb/core/tx/sharding/ut/CMakeLists.linux-aarch64.txt b/ydb/core/tx/sharding/ut/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..64f6dac1437
--- /dev/null
+++ b/ydb/core/tx/sharding/ut/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,84 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-tx-sharding-ut)
+target_compile_options(ydb-core-tx-sharding-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-tx-sharding-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering
+)
+target_link_libraries(ydb-core-tx-sharding-ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-lfalloc
+ cpp-testing-unittest_main
+ core-tx-tiering
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ core-testlib-default
+ core-tx-sharding
+ public-lib-yson_value
+)
+target_link_options(ydb-core-tx-sharding-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-core-tx-sharding-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/ut/ut_sharding.cpp
+)
+set_property(
+ TARGET
+ ydb-core-tx-sharding-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 5
+)
+add_yunittest(
+ NAME
+ ydb-core-tx-sharding-ut
+ TEST_TARGET
+ ydb-core-tx-sharding-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-sharding-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-sharding-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-sharding-ut
+ PROPERTY
+ TIMEOUT
+ 600
+)
+vcs_info(ydb-core-tx-sharding-ut)
diff --git a/ydb/core/tx/sharding/ut/CMakeLists.linux.txt b/ydb/core/tx/sharding/ut/CMakeLists.linux.txt
new file mode 100644
index 00000000000..aa8b0faf5eb
--- /dev/null
+++ b/ydb/core/tx/sharding/ut/CMakeLists.linux.txt
@@ -0,0 +1,86 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-tx-sharding-ut)
+target_compile_options(ydb-core-tx-sharding-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-tx-sharding-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering
+)
+target_link_libraries(ydb-core-tx-sharding-ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ core-tx-tiering
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ core-testlib-default
+ core-tx-sharding
+ public-lib-yson_value
+)
+target_link_options(ydb-core-tx-sharding-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-core-tx-sharding-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/ut/ut_sharding.cpp
+)
+set_property(
+ TARGET
+ ydb-core-tx-sharding-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 5
+)
+add_yunittest(
+ NAME
+ ydb-core-tx-sharding-ut
+ TEST_TARGET
+ ydb-core-tx-sharding-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-sharding-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-sharding-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-sharding-ut
+ PROPERTY
+ TIMEOUT
+ 600
+)
+vcs_info(ydb-core-tx-sharding-ut)
diff --git a/ydb/core/tx/sharding/ut/CMakeLists.txt b/ydb/core/tx/sharding/ut/CMakeLists.txt
new file mode 100644
index 00000000000..5bb4faffb40
--- /dev/null
+++ b/ydb/core/tx/sharding/ut/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/core/tx/sharding/ut/ut_sharding.cpp b/ydb/core/tx/sharding/ut/ut_sharding.cpp
new file mode 100644
index 00000000000..7e8ae24199b
--- /dev/null
+++ b/ydb/core/tx/sharding/ut/ut_sharding.cpp
@@ -0,0 +1,32 @@
+#include <ydb/core/testlib/cs_helper.h>
+#include <ydb/core/tx/sharding/sharding.h>
+#include <ydb/core/tx/sharding/xx_hash.h>
+
+#include <library/cpp/actors/core/av_bootstrapped.h>
+#include <library/cpp/protobuf/json/proto2json.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/system/hostname.h>
+
+namespace NKikimr {
+
+Y_UNIT_TEST_SUITE(Sharding) {
+
+ Y_UNIT_TEST(XXUsage) {
+ NSharding::TStreamStringHashCalcer hCalcer(0);
+ for (ui32 a = 1; a < 10; ++a) {
+ TString ss;
+ hCalcer.Start();
+ for (ui32 i = 0; i < 10000; ++i) {
+ const ui8 c = RandomNumber<ui8>();
+ hCalcer.Update(&c, 1);
+ ss += (char)c;
+ UNIT_ASSERT(hCalcer.Finish() == XXH64(ss.data(), ss.size(), 0));
+ if (i % 1000 == 0) {
+ Cerr << hCalcer.Finish() << Endl;
+ }
+ }
+ }
+ }
+}
+}
diff --git a/ydb/core/tx/sharding/xx_hash.cpp b/ydb/core/tx/sharding/xx_hash.cpp
new file mode 100644
index 00000000000..d5e0c416977
--- /dev/null
+++ b/ydb/core/tx/sharding/xx_hash.cpp
@@ -0,0 +1,17 @@
+#include "xx_hash.h"
+
+namespace NKikimr::NSharding {
+
+void TStreamStringHashCalcer::Start() {
+ XXH64_reset(&HashState, Seed);
+}
+
+void TStreamStringHashCalcer::Update(const ui8* data, const ui32 size) {
+ XXH64_update(&HashState, data, size);
+}
+
+ui64 TStreamStringHashCalcer::Finish() {
+ return XXH64_digest(&HashState);
+}
+
+}
diff --git a/ydb/core/tx/sharding/xx_hash.h b/ydb/core/tx/sharding/xx_hash.h
new file mode 100644
index 00000000000..2761ebdb6db
--- /dev/null
+++ b/ydb/core/tx/sharding/xx_hash.h
@@ -0,0 +1,25 @@
+#pragma once
+#include "hash.h"
+
+#ifndef XXH_STATIC_LINKING_ONLY
+# define XXH_STATIC_LINKING_ONLY /* XXH64_state_t */
+#endif
+#include <contrib/libs/xxhash/xxhash.h>
+
+namespace NKikimr::NSharding {
+
+class TStreamStringHashCalcer: public IHashCalcer {
+private:
+ ui64 Seed;
+ XXH64_state_t HashState;
+public:
+ TStreamStringHashCalcer(const ui64 seed)
+ : Seed(seed) {
+ }
+
+ virtual void Start() override;
+ virtual void Update(const ui8* data, const ui32 size) override;
+ virtual ui64 Finish() override;
+};
+
+}
diff --git a/ydb/services/ydb/ydb_long_tx_ut.cpp b/ydb/services/ydb/ydb_long_tx_ut.cpp
index 7670e711c40..1a42e1059d1 100644
--- a/ydb/services/ydb/ydb_long_tx_ut.cpp
+++ b/ydb/services/ydb/ydb_long_tx_ut.cpp
@@ -3,8 +3,8 @@
#include <ydb/public/sdk/cpp/client/draft/ydb_long_tx.h>
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/tx/long_tx_service/public/types.h>
+#include <ydb/core/tx/sharding/sharding.h>
#include <ydb/core/formats/arrow_helpers.h>
-#include <ydb/core/formats/sharding.h>
#include <ydb/library/aclib/aclib.h>
using namespace NYdb;
@@ -35,8 +35,8 @@ TVector<std::shared_ptr<arrow::RecordBatch>> SplitData(const TString& data, ui32
std::shared_ptr<arrow::RecordBatch> batch = NArrow::DeserializeBatch(data, TTestOlap::ArrowSchema());
Y_VERIFY(batch);
- NArrow::TLogsSharding sharding(numBatches);
- std::vector<ui32> rowSharding = sharding.MakeSharding(batch, {"timestamp", "uid"});
+ NSharding::TLogsSharding sharding(numBatches, { "timestamp", "uid" }, numBatches);
+ std::vector<ui32> rowSharding = sharding.MakeSharding(batch);
Y_VERIFY(rowSharding.size() == (size_t)batch->num_rows());
std::vector<std::shared_ptr<arrow::RecordBatch>> sharded = NArrow::ShardingSplit(batch, rowSharding, numBatches);