diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-03-02 12:20:39 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-03-02 12:20:39 +0300 |
commit | c79a769325b2b08e32db32199488f0a03457481d (patch) | |
tree | c3396b76d94d395419357a3464185d9e88d9ae15 | |
parent | 0ae7543b3e35182cf8617606040b606cd595232a (diff) | |
download | ydb-c79a769325b2b08e32db32199488f0a03457481d.tar.gz |
distribute data by shards on upsert for cs
30 files changed, 911 insertions, 241 deletions
diff --git a/ydb/core/formats/sharding.h b/ydb/core/formats/sharding.h deleted file mode 100644 index ea9b6c97b2f..00000000000 --- a/ydb/core/formats/sharding.h +++ /dev/null @@ -1,189 +0,0 @@ -#pragma once -#include "arrow_helpers.h" - -#include <contrib/libs/xxhash/xxhash.h> -#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h> - -#include <type_traits> - -namespace NKikimr::NArrow { - -class TShardingBase { -protected: - static void AppendField(const std::shared_ptr<arrow::Array>& array, int row, std::string& concat) { - NArrow::SwitchType(array->type_id(), [&](const auto& type) { - using TWrap = std::decay_t<decltype(type)>; - using T = typename TWrap::T; - using TArray = typename arrow::TypeTraits<T>::ArrayType; - - if (!array->IsNull(row)) { - auto& typedArray = static_cast<const TArray&>(*array); - auto value = typedArray.GetView(row); - if constexpr (arrow::has_string_view<T>()) { - concat.append(value.data(), value.size()); - } else if constexpr (arrow::has_c_type<T>()) { - if constexpr (arrow::is_physical_integer_type<T>()) { - concat.append(reinterpret_cast<const char*>(&value), sizeof(value)); - } else { - // Do not use bool or floats for sharding - static_assert(arrow::is_boolean_type<T>() || arrow::is_floating_type<T>()); - } - } else { - static_assert(arrow::is_decimal_type<T>()); - } - } - return true; - }); - } -}; - -class THashSharding : public TShardingBase { -public: - THashSharding(ui32 shardsCount, ui64 seed = 0) - : ShardsCount(shardsCount) - , Seed(seed) - {} - - std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch, - const TVector<TString>& shardingColumns) const - { - std::vector<std::shared_ptr<arrow::Array>> columns; - columns.reserve(shardingColumns.size()); - - for (auto& colName : shardingColumns) { - auto array = batch->GetColumnByName(colName); - if (!array) { - return {}; - } - columns.emplace_back(array); - } - - std::vector<ui32> out(batch->num_rows()); - - if (columns.size() == 1) { - auto& array = columns[0]; - SwitchType(array->type_id(), [&](const auto& type) { - using TWrap = std::decay_t<decltype(type)>; - using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType; - - auto& column = static_cast<const TArray&>(*array); - - for (int row = 0; row < batch->num_rows(); ++row) { - out[row] = ShardNo(column.GetView(row)); - } - return true; - }); - } else { - std::string concat; - for (int row = 0; row < batch->num_rows(); ++row) { - concat.clear(); - for (auto& column : columns) { - AppendField(column, row, concat); - } - - out[row] = ShardNo(concat); - } - } - - return out; - } - - template <typename T, std::enable_if_t<std::is_arithmetic<T>::value, bool> = true> - ui32 ShardNo(T value) const { - return XXH64(&value, sizeof(value), Seed) % ShardsCount; - } - -private: - ui32 ShardsCount; - ui64 Seed; - - ui32 ShardNo(arrow::util::string_view value) const { - return XXH64(value.data(), value.size(), Seed) % ShardsCount; - } -}; - -// KIKIMR-11529 -class TLogsSharding : public TShardingBase { -public: - static constexpr ui32 DEFAULT_ACITVE_SHARDS = 10; - static constexpr TDuration DEFAULT_CHANGE_PERIOD = TDuration::Minutes(5); - - // TODO - TLogsSharding(ui32 shardsCountTotal, ui32 shardsCountActive = DEFAULT_ACITVE_SHARDS, TDuration changePeriod = DEFAULT_CHANGE_PERIOD) - : ShardsCount(shardsCountTotal) - , NumActive(Min<ui32>(shardsCountActive, ShardsCount)) - , TsMin(0) - , ChangePeriod(changePeriod.MicroSeconds()) - {} - - // tsMin = GetTsMin(tabletIdsMap, timestamp); - // tabletIds = GetTableIdsByTs(tabletIdsMap, timestamp); - // numIntervals = tabletIds.size() / nActive; - // tsInterval = (timestamp - tsMin) / changePeriod; - // shardNo = (hash(uid) % nActive) + (tsInterval % numIntervals) * nActive; - // tabletId = tabletIds[shardNo]; - ui32 ShardNo(ui64 timestamp, arrow::util::string_view uid) const { - ui64 uidHash = XXH64(uid.data(), uid.size(), 0); - ui32 tsInterval = (timestamp - TsMin) / ChangePeriod; - ui32 numIntervals = ShardsCount / NumActive; - return ((uidHash % NumActive) + (tsInterval % numIntervals) * NumActive) % ShardsCount; - } - - std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch, - const TVector<TString>& shardingColumns) const - { - if (shardingColumns.size() < 2) { - return {}; - } - - auto tsArray = batch->GetColumnByName(shardingColumns[0]); - if (!tsArray || tsArray->type_id() != arrow::Type::TIMESTAMP) { - return {}; - } - - std::vector<std::shared_ptr<arrow::Array>> extraColumns; - extraColumns.reserve(shardingColumns.size() - 1); - - for (size_t i = 1; i < shardingColumns.size(); ++i) { - auto array = batch->GetColumnByName(shardingColumns[i]); - if (!array) { - return {}; - } - extraColumns.emplace_back(array); - } - - auto tsColumn = std::static_pointer_cast<arrow::TimestampArray>(tsArray); - std::vector<ui32> out; - out.reserve(batch->num_rows()); - - if (extraColumns.size() == 1 && extraColumns[0]->type_id() == arrow::Type::STRING) { - auto column = std::static_pointer_cast<arrow::StringArray>(extraColumns[0]); - - for (int row = 0; row < batch->num_rows(); ++row) { - ui32 shardNo = ShardNo(tsColumn->Value(row), column->GetView(row)); - out.emplace_back(shardNo); - } - } else { - std::string concat; - for (int row = 0; row < batch->num_rows(); ++row) { - concat.clear(); - for (auto& column : extraColumns) { - AppendField(column, row, concat); - } - - ui32 shardNo = ShardNo(tsColumn->Value(row), concat); - out.emplace_back(shardNo); - } - } - - return out; - } - -private: - ui32 ShardsCount; - ui32 NumActive; - ui64 TsMin; - ui64 ChangePeriod; -}; - -} diff --git a/ydb/core/grpc_services/CMakeLists.darwin.txt b/ydb/core/grpc_services/CMakeLists.darwin.txt index 3dc6b63a011..7b1e0c5eedb 100644 --- a/ydb/core/grpc_services/CMakeLists.darwin.txt +++ b/ydb/core/grpc_services/CMakeLists.darwin.txt @@ -39,6 +39,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC ydb-core-sys_view ydb-core-tx core-tx-datashard + core-tx-sharding tx-long_tx_service-public ydb-core-ydb_convert yq-libs-actors diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt index bb6c3dc35af..76a073859fb 100644 --- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt +++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt @@ -40,6 +40,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC ydb-core-sys_view ydb-core-tx core-tx-datashard + core-tx-sharding tx-long_tx_service-public ydb-core-ydb_convert yq-libs-actors diff --git a/ydb/core/grpc_services/CMakeLists.linux.txt b/ydb/core/grpc_services/CMakeLists.linux.txt index bb6c3dc35af..76a073859fb 100644 --- a/ydb/core/grpc_services/CMakeLists.linux.txt +++ b/ydb/core/grpc_services/CMakeLists.linux.txt @@ -40,6 +40,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC ydb-core-sys_view ydb-core-tx core-tx-datashard + core-tx-sharding tx-long_tx_service-public ydb-core-ydb_convert yq-libs-actors diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp index 5e548fbc8ca..a19151fa2c9 100644 --- a/ydb/core/grpc_services/rpc_long_tx.cpp +++ b/ydb/core/grpc_services/rpc_long_tx.cpp @@ -9,7 +9,7 @@ #include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/tablet/tablet_pipe_client_cache.h> #include <ydb/core/formats/arrow_helpers.h> -#include <ydb/core/formats/sharding.h> +#include <ydb/core/tx/sharding/sharding.h> #include <ydb/core/scheme/scheme_types_proto.h> #include <ydb/core/tx/schemeshard/schemeshard.h> #include <ydb/core/tx/columnshard/columnshard.h> @@ -97,10 +97,8 @@ TFullSplitData SplitData(const std::shared_ptr<arrow::RecordBatch>& batch, Y_VERIFY(description.HasSharding() && description.GetSharding().HasHashSharding()); auto& descSharding = description.GetSharding(); - auto& hashSharding = descSharding.GetHashSharding(); TVector<ui64> tabletIds(descSharding.GetColumnShards().begin(), descSharding.GetColumnShards().end()); - TVector<TString> shardingColumns(hashSharding.GetColumns().begin(), hashSharding.GetColumns().end()); ui32 numShards = tabletIds.size(); Y_VERIFY(numShards); TFullSplitData result(numShards); @@ -111,28 +109,15 @@ TFullSplitData SplitData(const std::shared_ptr<arrow::RecordBatch>& batch, return result; } + auto sharding = NSharding::TShardingBase::BuildShardingOperator(descSharding); std::vector<ui32> rowSharding; - if (hashSharding.GetFunction() == NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_MODULO_N) { - NArrow::THashSharding sharding(numShards); - rowSharding = sharding.MakeSharding(batch, shardingColumns); - } else if (hashSharding.GetFunction() == NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CLOUD_LOGS) { - ui32 activeShards = NArrow::TLogsSharding::DEFAULT_ACITVE_SHARDS; - if (hashSharding.HasActiveShardsCount()) { - activeShards = hashSharding.GetActiveShardsCount(); - } - NArrow::TLogsSharding sharding(numShards, activeShards); - rowSharding = sharding.MakeSharding(batch, shardingColumns); + if (sharding) { + rowSharding = sharding->MakeSharding(batch); } - if (rowSharding.empty()) { result.ErrorString = "empty " - + NKikimrSchemeOp::TColumnTableSharding::THashSharding::EHashFunction_Name(hashSharding.GetFunction()) - + " sharding"; - for (auto& column : shardingColumns) { - if (batch->schema()->GetFieldIndex(column) < 0) { - result.ErrorString += ", no column '" + column + "'"; - } - } + + NKikimrSchemeOp::TColumnTableSharding::THashSharding::EHashFunction_Name(descSharding.GetHashSharding().GetFunction()) + + " sharding (" + (sharding ? sharding->DebugString() : "no sharding object") + ")"; return result; } diff --git a/ydb/core/kqp/common/CMakeLists.darwin.txt b/ydb/core/kqp/common/CMakeLists.darwin.txt index e8d477e1826..ace4ec9165e 100644 --- a/ydb/core/kqp/common/CMakeLists.darwin.txt +++ b/ydb/core/kqp/common/CMakeLists.darwin.txt @@ -19,6 +19,7 @@ target_link_libraries(core-kqp-common PUBLIC core-kqp-expr_nodes core-kqp-provider tx-long_tx_service-public + core-tx-sharding yql-dq-expr_nodes ydb-library-aclib yql-core-issue diff --git a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt index f1f616100c8..81a8be2dc08 100644 --- a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt @@ -20,6 +20,7 @@ target_link_libraries(core-kqp-common PUBLIC core-kqp-expr_nodes core-kqp-provider tx-long_tx_service-public + core-tx-sharding yql-dq-expr_nodes ydb-library-aclib yql-core-issue diff --git a/ydb/core/kqp/common/CMakeLists.linux.txt b/ydb/core/kqp/common/CMakeLists.linux.txt index f1f616100c8..81a8be2dc08 100644 --- a/ydb/core/kqp/common/CMakeLists.linux.txt +++ b/ydb/core/kqp/common/CMakeLists.linux.txt @@ -20,6 +20,7 @@ target_link_libraries(core-kqp-common PUBLIC core-kqp-expr_nodes core-kqp-provider tx-long_tx_service-public + core-tx-sharding yql-dq-expr_nodes ydb-library-aclib yql-core-issue diff --git a/ydb/core/kqp/common/kqp_resolve.h b/ydb/core/kqp/common/kqp_resolve.h index 542e9e03384..ffc5eb7f577 100644 --- a/ydb/core/kqp/common/kqp_resolve.h +++ b/ydb/core/kqp/common/kqp_resolve.h @@ -1,9 +1,11 @@ #pragma once #include <ydb/core/engine/mkql_keys.h> +#include <ydb/core/tx/sharding/sharding.h> #include <ydb/core/kqp/expr_nodes/kqp_expr_nodes.h> #include <ydb/core/protos/kqp_physical.pb.h> #include <ydb/core/scheme/scheme_tabledefs.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/library/yql/minikql/mkql_node.h> @@ -21,18 +23,30 @@ enum class ETableKind { class TKqpTableKeys { public: - struct TColumn { - ui32 Id; - NScheme::TTypeInfo Type; - TString TypeMod; - }; + using TColumn = NSharding::TShardingBase::TColumn; struct TTable { + public: TString Path; TMap<TString, TColumn> Columns; TVector<TString> KeyColumns; TVector<NScheme::TTypeInfo> KeyColumnTypes; ETableKind TableKind = ETableKind::Unknown; + TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TColumnTableInfo> ColumnTableInfo; + + const TMap<TString, TColumn>& GetColumnsRemap() const { + return Columns; + } + + std::unique_ptr<NSharding::TShardingBase> BuildSharding() const { + if (ColumnTableInfo) { + auto result = NSharding::TShardingBase::BuildShardingOperator(ColumnTableInfo->Description.GetSharding()); + YQL_ENSURE(result); + return result; + } else { + return nullptr; + } + } }; TTable* FindTablePtr(const TTableId& id) { diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp index 53a74080590..d609608dd45 100644 --- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp +++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp @@ -57,18 +57,36 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey( } NUdf::TUnboxedValue paramValue; + std::unique_ptr<NSharding::TShardingBase> sharding = table.BuildSharding(); + std::unique_ptr<NSharding::TUnboxedValueReader> unboxedReader; + if (sharding) { + unboxedReader = std::make_unique<NSharding::TUnboxedValueReader>(structType, table.GetColumnsRemap(), sharding->GetShardingColumns()); + } auto it = value.GetListIterator(); while (it.Next(paramValue)) { - auto keyValue = MakeKeyCells(paramValue, table.KeyColumnTypes, keyColumnIndices, - typeEnv, /* copyValues */ true); - Y_VERIFY_DEBUG(keyValue.size() == keyLen); - - ui32 partitionIndex = FindKeyPartitionIndex(keyValue, key.GetPartitions(), table.KeyColumnTypes, - [] (const auto& partition) { return *partition.Range; }); + ui64 shardId = 0; + if (sharding) { + shardId = key.GetPartitions()[sharding->CalcShardId(paramValue, *unboxedReader)].ShardId; + } else { + auto keyValue = MakeKeyCells(paramValue, table.KeyColumnTypes, keyColumnIndices, + typeEnv, /* copyValues */ true); + Y_VERIFY_DEBUG(keyValue.size() == keyLen); + const ui32 partitionIndex = FindKeyPartitionIndex(keyValue, key.GetPartitions(), table.KeyColumnTypes, + [](const auto& partition) { return *partition.Range; }); - auto point = TSerializedCellVec(TSerializedCellVec::Serialize(keyValue)); + shardId = key.GetPartitions()[partitionIndex].ShardId; - ui64 shardId = key.GetPartitions()[partitionIndex].ShardId; + auto point = TSerializedCellVec(TSerializedCellVec::Serialize(keyValue)); + auto& shardData = ret[shardId]; + if (key.GetPartitions()[partitionIndex].Range->IsPoint) { + // singular case when partition is just a point + shardData.FullRange.emplace(TSerializedTableRange(point.GetBuffer(), "", true, true)); + shardData.FullRange->Point = true; + shardData.Ranges.clear(); + } else { + shardData.Ranges.emplace_back(std::move(point)); + } + } auto& shardData = ret[shardId]; for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { @@ -87,14 +105,6 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey( shardParamValues[shardId].emplace_back(std::move(paramValue)); - if (key.GetPartitions()[partitionIndex].Range->IsPoint) { - // singular case when partition is just a point - shardData.FullRange.emplace(TSerializedTableRange(point.GetBuffer(), "", true, true)); - shardData.FullRange->Point = true; - shardData.Ranges.clear(); - } else { - shardData.Ranges.emplace_back(std::move(point)); - } shardData.ParamType = itemType; } diff --git a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp index 6cdcf4bd38b..f9dabdf25cf 100644 --- a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp +++ b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp @@ -159,6 +159,7 @@ private: STATEFN(ResolveKeysState) { switch (ev->GetTypeRewrite()) { hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, HandleResolveKeys); + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleResolveKeys); hFunc(TEvents::TEvPoison, HandleResolveKeys); default: { LOG_C("ResolveKeysState: unexpected event " << ev->GetTypeRewrite()); @@ -167,6 +168,43 @@ private: } } + void HandleResolveKeys(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + if (ShouldTerminate) { + PassAway(); + return; + } + auto& results = ev->Get()->Request->ResultSet; + if (results.size() != TableRequestIds.size()) { + ReplyErrorAndDie(Ydb::StatusIds::INTERNAL_ERROR, TIssue(TStringBuilder() << "navigation problems for tables")); + return; + } + LOG_D("Navigated key sets: " << results.size()); + for (auto& entry : results) { + if (!TableRequestIds.erase(entry.TableId)) { + ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, + YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder() + << "Incorrect tableId in reply " << entry.TableId << '.')); + return; + } + if (entry.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { + ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, + YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder() + << "Failed to resolve table " << entry.TableId << " keys: " << entry.Status << '.')); + return; + } + auto* table = TableKeys.FindTablePtr(entry.TableId); + if (!table) { + ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, + YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder() + << "Incorrect tableId in table keys " << entry.TableId << '.')); + return; + } + table->ColumnTableInfo = entry.ColumnTableInfo; + } + NavigationFinished = true; + TryFinish(); + } + void HandleResolveKeys(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr &ev) { if (ShouldTerminate) { PassAway(); @@ -215,12 +253,8 @@ private: } timer.reset(); - - auto replyEv = std::make_unique<TEvKqpExecuter::TEvTableResolveStatus>(); - replyEv->CpuTime = CpuTime; - - Send(Owner, replyEv.release()); - PassAway(); + ResolvingFinished = true; + TryFinish(); } void HandleResolveKeys(TEvents::TEvPoison::TPtr&) { @@ -231,6 +265,7 @@ private: void ResolveKeys() { FillKqpTasksGraphStages(TasksGraph, Transactions); + auto requestNavigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(); auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>(); request->ResultSet.reserve(TasksGraph.GetStagesInfo().size()); if (UserToken) { @@ -249,6 +284,13 @@ private: stageInfo.Meta.ShardKey = ExtractKey(stageInfo.Meta.TableId, operation); + if (stageInfo.Meta.TableKind == ETableKind::Olap && TableRequestIds.emplace(stageInfo.Meta.TableId).second) { + auto& entry = requestNavigate->ResultSet.emplace_back(); + entry.TableId = stageInfo.Meta.TableId; + entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; + entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpTable; + } + auto& entry = request->ResultSet.emplace_back(std::move(stageInfo.Meta.ShardKey)); entry.UserData = EncodeStageInfo(stageInfo); switch (operation) { @@ -266,7 +308,11 @@ private: } } } - + if (requestNavigate->ResultSet.size()) { + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(requestNavigate.release())); + } else { + NavigationFinished = true; + } Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request)); } @@ -310,12 +356,26 @@ private: PassAway(); } + void TryFinish() { + if (!NavigationFinished || !ResolvingFinished) { + return; + } + auto replyEv = std::make_unique<TEvKqpExecuter::TEvTableResolveStatus>(); + replyEv->CpuTime = CpuTime; + + Send(Owner, replyEv.release()); + PassAway(); + } + private: const TActorId Owner; const ui64 TxId; const TMaybe<TString> UserToken; const TVector<IKqpGateway::TPhysicalTxData>& Transactions; TKqpTableKeys& TableKeys; + THashSet<TTableId> TableRequestIds; + bool NavigationFinished = false; + bool ResolvingFinished = false; // TODO: TableResolver should not populate TasksGraph as it's not related to its job (bad API). TKqpTasksGraph& TasksGraph; diff --git a/ydb/core/tx/CMakeLists.darwin.txt b/ydb/core/tx/CMakeLists.darwin.txt index ceb4de0b6fc..e4fe0eea1fc 100644 --- a/ydb/core/tx/CMakeLists.darwin.txt +++ b/ydb/core/tx/CMakeLists.darwin.txt @@ -18,6 +18,7 @@ add_subdirectory(scheme_cache) add_subdirectory(schemeshard) add_subdirectory(sequenceproxy) add_subdirectory(sequenceshard) +add_subdirectory(sharding) add_subdirectory(tiering) add_subdirectory(time_cast) add_subdirectory(tx_allocator) diff --git a/ydb/core/tx/CMakeLists.linux-aarch64.txt b/ydb/core/tx/CMakeLists.linux-aarch64.txt index 8f7572568bf..f2f8b8e3e5a 100644 --- a/ydb/core/tx/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/CMakeLists.linux-aarch64.txt @@ -18,6 +18,7 @@ add_subdirectory(scheme_cache) add_subdirectory(schemeshard) add_subdirectory(sequenceproxy) add_subdirectory(sequenceshard) +add_subdirectory(sharding) add_subdirectory(tiering) add_subdirectory(time_cast) add_subdirectory(tx_allocator) diff --git a/ydb/core/tx/CMakeLists.linux.txt b/ydb/core/tx/CMakeLists.linux.txt index 8f7572568bf..f2f8b8e3e5a 100644 --- a/ydb/core/tx/CMakeLists.linux.txt +++ b/ydb/core/tx/CMakeLists.linux.txt @@ -18,6 +18,7 @@ add_subdirectory(scheme_cache) add_subdirectory(schemeshard) add_subdirectory(sequenceproxy) add_subdirectory(sequenceshard) +add_subdirectory(sharding) add_subdirectory(tiering) add_subdirectory(time_cast) add_subdirectory(tx_allocator) diff --git a/ydb/core/tx/sharding/CMakeLists.darwin.txt b/ydb/core/tx/sharding/CMakeLists.darwin.txt new file mode 100644 index 00000000000..cab50904fd5 --- /dev/null +++ b/ydb/core/tx/sharding/CMakeLists.darwin.txt @@ -0,0 +1,28 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(core-tx-sharding) +target_compile_options(core-tx-sharding PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-tx-sharding PUBLIC + contrib-libs-cxxsupp + yutil + library-yql-minikql + library-yql-utils + yql-public-udf + ydb-core-formats + ydb-core-protos +) +target_sources(core-tx-sharding PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/sharding.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/hash.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/xx_hash.cpp +) diff --git a/ydb/core/tx/sharding/CMakeLists.linux-aarch64.txt b/ydb/core/tx/sharding/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..5e702794935 --- /dev/null +++ b/ydb/core/tx/sharding/CMakeLists.linux-aarch64.txt @@ -0,0 +1,29 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(core-tx-sharding) +target_compile_options(core-tx-sharding PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-tx-sharding PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-yql-minikql + library-yql-utils + yql-public-udf + ydb-core-formats + ydb-core-protos +) +target_sources(core-tx-sharding PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/sharding.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/hash.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/xx_hash.cpp +) diff --git a/ydb/core/tx/sharding/CMakeLists.linux.txt b/ydb/core/tx/sharding/CMakeLists.linux.txt new file mode 100644 index 00000000000..5e702794935 --- /dev/null +++ b/ydb/core/tx/sharding/CMakeLists.linux.txt @@ -0,0 +1,29 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(core-tx-sharding) +target_compile_options(core-tx-sharding PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-tx-sharding PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-yql-minikql + library-yql-utils + yql-public-udf + ydb-core-formats + ydb-core-protos +) +target_sources(core-tx-sharding PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/sharding.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/hash.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/xx_hash.cpp +) diff --git a/ydb/core/tx/sharding/CMakeLists.txt b/ydb/core/tx/sharding/CMakeLists.txt new file mode 100644 index 00000000000..5bb4faffb40 --- /dev/null +++ b/ydb/core/tx/sharding/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/core/tx/sharding/hash.cpp b/ydb/core/tx/sharding/hash.cpp new file mode 100644 index 00000000000..b770619203a --- /dev/null +++ b/ydb/core/tx/sharding/hash.cpp @@ -0,0 +1,5 @@ +#include "hash.h" + +namespace NKikimr::NSharding { + +} diff --git a/ydb/core/tx/sharding/hash.h b/ydb/core/tx/sharding/hash.h new file mode 100644 index 00000000000..0f98c6f1602 --- /dev/null +++ b/ydb/core/tx/sharding/hash.h @@ -0,0 +1,13 @@ +#pragma once +#include <util/system/types.h> + +namespace NKikimr::NSharding { + +class IHashCalcer { +public: + virtual void Update(const ui8* data, const ui32 size) = 0; + virtual ui64 Finish() = 0; + virtual void Start() = 0; +}; + +} diff --git a/ydb/core/tx/sharding/sharding.cpp b/ydb/core/tx/sharding/sharding.cpp new file mode 100644 index 00000000000..a3f006db34c --- /dev/null +++ b/ydb/core/tx/sharding/sharding.cpp @@ -0,0 +1,178 @@ +#include "sharding.h" +#include "xx_hash.h" +#include <ydb/library/yql/utils/yql_panic.h> +#include <ydb/library/yql/minikql/mkql_node.h> +#include <util/string/join.h> + +namespace NKikimr::NSharding { + +void TShardingBase::AppendField(const std::shared_ptr<arrow::Array>& array, int row, IHashCalcer& hashCalcer) { + NArrow::SwitchType(array->type_id(), [&](const auto& type) { + using TWrap = std::decay_t<decltype(type)>; + using T = typename TWrap::T; + using TArray = typename arrow::TypeTraits<T>::ArrayType; + + if (!array->IsNull(row)) { + auto& typedArray = static_cast<const TArray&>(*array); + auto value = typedArray.GetView(row); + if constexpr (arrow::has_string_view<T>()) { + hashCalcer.Update((const ui8*)value.data(), value.size()); + } else if constexpr (arrow::has_c_type<T>()) { + if constexpr (arrow::is_physical_integer_type<T>()) { + hashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); + } else { + // Do not use bool or floats for sharding + static_assert(arrow::is_boolean_type<T>() || arrow::is_floating_type<T>()); + } + } else { + static_assert(arrow::is_decimal_type<T>()); + } + } + return true; + }); +} + +void TUnboxedValueReader::BuildStringForHash(const NKikimr::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) const { + for (auto&& i : ColumnsInfo) { + auto columnValue = value.GetElement(i.Idx); + if (columnValue.IsString()) { + hashCalcer.Update((const ui8*)columnValue.AsStringRef().Data(), columnValue.AsStringRef().Size()); + } else if (columnValue.IsEmbedded()) { + switch (i.Type.GetTypeId()) { + case NScheme::NTypeIds::Uint16: + FieldToHashString<ui16>(columnValue, hashCalcer); + continue; + case NScheme::NTypeIds::Uint32: + FieldToHashString<ui32>(columnValue, hashCalcer); + continue; + case NScheme::NTypeIds::Uint64: + FieldToHashString<ui64>(columnValue, hashCalcer); + continue; + case NScheme::NTypeIds::Int16: + FieldToHashString<i16>(columnValue, hashCalcer); + continue; + case NScheme::NTypeIds::Int32: + FieldToHashString<i32>(columnValue, hashCalcer); + continue; + case NScheme::NTypeIds::Int64: + FieldToHashString<i64>(columnValue, hashCalcer); + continue; + } + YQL_ENSURE(false, "incorrect column type for shard calculation"); + } else { + YQL_ENSURE(false, "incorrect column type for shard calculation"); + } + } +} + +TUnboxedValueReader::TUnboxedValueReader(const NMiniKQL::TStructType* structInfo, + const TMap<TString, TExternalTableColumn>& columnsRemap, const std::vector<TString>& shardingColumns) { + YQL_ENSURE(shardingColumns.size()); + for (auto&& i : shardingColumns) { + auto it = columnsRemap.find(i); + YQL_ENSURE(it != columnsRemap.end()); + ColumnsInfo.emplace_back(TColumnUnboxedPlaceInfo(it->second, structInfo->GetMemberIndex(i), i)); + } +} + +std::unique_ptr<TShardingBase> TShardingBase::BuildShardingOperator(const NKikimrSchemeOp::TColumnTableSharding& sharding) { + if (sharding.HasHashSharding()) { + auto& hashSharding = sharding.GetHashSharding(); + std::vector<TString> columnNames(hashSharding.GetColumns().begin(), hashSharding.GetColumns().end()); + if (hashSharding.GetFunction() == NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_MODULO_N) { + return std::make_unique<THashSharding>(sharding.GetColumnShards().size(), columnNames, 0); + } else if (hashSharding.GetFunction() == NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CLOUD_LOGS) { + ui32 activeShards = TLogsSharding::DEFAULT_ACITVE_SHARDS; + if (hashSharding.HasActiveShardsCount()) { + activeShards = hashSharding.GetActiveShardsCount(); + } + return std::make_unique<TLogsSharding>(sharding.GetColumnShards().size(), columnNames, activeShards); + } + } + return nullptr; +} + +TString TShardingBase::DebugString() const { + return "Columns: " + JoinSeq(", ", GetShardingColumns()); +} + + +std::vector<ui32> THashSharding::MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const { + std::vector<std::shared_ptr<arrow::Array>> columns; + columns.reserve(ShardingColumns.size()); + + for (auto& colName : ShardingColumns) { + auto array = batch->GetColumnByName(colName); + if (!array) { + return {}; + } + columns.emplace_back(array); + } + + std::vector<ui32> out(batch->num_rows()); + + TStreamStringHashCalcer hashCalcer(Seed); + + for (int row = 0; row < batch->num_rows(); ++row) { + hashCalcer.Start(); + for (auto& column : columns) { + AppendField(column, row, hashCalcer); + } + out[row] = hashCalcer.Finish() % ShardsCount; + } + + return out; +} + +ui32 THashSharding::CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const { + TStreamStringHashCalcer hashCalcer(Seed); + hashCalcer.Start(); + readerInfo.BuildStringForHash(value, hashCalcer); + return hashCalcer.Finish() % ShardsCount; +} + +std::vector<ui32> TLogsSharding::MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const { + if (ShardingColumns.size() < 2) { + return {}; + } + + auto tsArray = batch->GetColumnByName(ShardingColumns[0]); + if (!tsArray || tsArray->type_id() != arrow::Type::TIMESTAMP) { + return {}; + } + + std::vector<std::shared_ptr<arrow::Array>> extraColumns; + extraColumns.reserve(ShardingColumns.size() - 1); + + for (size_t i = 1; i < ShardingColumns.size(); ++i) { + auto array = batch->GetColumnByName(ShardingColumns[i]); + if (!array) { + return {}; + } + extraColumns.emplace_back(array); + } + + auto tsColumn = std::static_pointer_cast<arrow::TimestampArray>(tsArray); + std::vector<ui32> out; + out.reserve(batch->num_rows()); + + TStreamStringHashCalcer hashCalcer(0); + for (int row = 0; row < batch->num_rows(); ++row) { + hashCalcer.Start(); + for (auto& column : extraColumns) { + AppendField(column, row, hashCalcer); + } + + const ui32 shardNo = ShardNo(tsColumn->Value(row), hashCalcer.Finish()); + out.emplace_back(shardNo); + } + + return out; +} + +ui32 TLogsSharding::CalcShardId(const NKikimr::NUdf::TUnboxedValue& /*value*/, const TUnboxedValueReader& /*readerInfo*/) const { + YQL_ENSURE(false); + return 0; +} + +} diff --git a/ydb/core/tx/sharding/sharding.h b/ydb/core/tx/sharding/sharding.h new file mode 100644 index 00000000000..3d0e0350cca --- /dev/null +++ b/ydb/core/tx/sharding/sharding.h @@ -0,0 +1,143 @@ +#pragma once + +#include "hash.h" + +#include <ydb/core/formats/arrow_helpers.h> +#include <ydb/core/protos/flat_scheme_op.pb.h> + +#include <ydb/library/yql/public/udf/udf_value.h> +#include <ydb/library/accessor/accessor.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h> +#include <contrib/libs/xxhash/xxhash.h> + +#include <type_traits> + +namespace NKikimr::NMiniKQL { +class TStructType; +} + +namespace NKikimr::NSharding { + +struct TExternalTableColumn { + ui32 Id; + NScheme::TTypeInfo Type; + TString TypeMod; +}; + +struct TColumnUnboxedPlaceInfo: public TExternalTableColumn { +private: + using TBase = TExternalTableColumn; +public: + const ui32 Idx; + const TString Name; + + TColumnUnboxedPlaceInfo(const TExternalTableColumn& baseInfo, const ui32 idx, const TString& name) + : TBase(baseInfo) + , Idx(idx) + , Name(name) { + + } +}; + +class TUnboxedValueReader { +private: + YDB_READONLY_DEF(std::vector<TColumnUnboxedPlaceInfo>, ColumnsInfo); + template <class T> + static void FieldToHashString(const NKikimr::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) { + static_assert(std::is_arithmetic<T>::value); + const T result = value.Get<T>(); + hashCalcer.Update((const ui8*)&result, sizeof(result)); + } +public: + void BuildStringForHash(const NKikimr::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) const; + TUnboxedValueReader(const NMiniKQL::TStructType* structInfo, const TMap<TString, TExternalTableColumn>& columnsRemap, const std::vector<TString>& shardingColumns); +}; + +class TShardingBase { +public: + using TColumn = TExternalTableColumn; + +protected: + static void AppendField(const std::shared_ptr<arrow::Array>& array, int row, IHashCalcer& hashCalcer); +public: + static std::unique_ptr<TShardingBase> BuildShardingOperator(const NKikimrSchemeOp::TColumnTableSharding& shardingInfo); + + virtual const std::vector<TString>& GetShardingColumns() const = 0; + virtual ui32 CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const = 0; + virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0; + + virtual TString DebugString() const; + + virtual ~TShardingBase() = default; +}; + +class THashSharding : public TShardingBase { +private: + ui32 ShardsCount; + ui64 Seed; + std::vector<TString> ShardingColumns; +public: + THashSharding(ui32 shardsCount, const std::vector<TString>& columnNames, ui64 seed = 0) + : ShardsCount(shardsCount) + , Seed(seed) + , ShardingColumns(columnNames) + {} + + virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const override; + + template <typename T> + static ui32 ShardNo(const T value, const ui32 shardsCount, const ui32 seed = 0) { + Y_ASSERT(shardsCount); + static_assert(std::is_arithmetic<T>::value); + return XXH64(&value, sizeof(value), seed) % shardsCount; + } + virtual const std::vector<TString>& GetShardingColumns() const override { + return ShardingColumns; + } + + virtual ui32 CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override; +}; + +// KIKIMR-11529 +class TLogsSharding : public TShardingBase { +private: + ui32 ShardsCount; + ui32 NumActive; + ui64 TsMin; + ui64 ChangePeriod; + const std::vector<TString> ShardingColumns; +public: + static constexpr ui32 DEFAULT_ACITVE_SHARDS = 10; + static constexpr TDuration DEFAULT_CHANGE_PERIOD = TDuration::Minutes(5); + + TLogsSharding(ui32 shardsCountTotal, const std::vector<TString>& columnNames, ui32 shardsCountActive, TDuration changePeriod = DEFAULT_CHANGE_PERIOD) + : ShardsCount(shardsCountTotal) + , NumActive(Min<ui32>(shardsCountActive, ShardsCount)) + , TsMin(0) + , ChangePeriod(changePeriod.MicroSeconds()) + , ShardingColumns(columnNames) + {} + + // tsMin = GetTsMin(tabletIdsMap, timestamp); + // tabletIds = GetTableIdsByTs(tabletIdsMap, timestamp); + // numIntervals = tabletIds.size() / nActive; + // tsInterval = (timestamp - tsMin) / changePeriod; + // shardNo = (hash(uid) % nActive) + (tsInterval % numIntervals) * nActive; + // tabletId = tabletIds[shardNo]; + ui32 ShardNo(ui64 timestamp, const ui64 uidHash) const { + ui32 tsInterval = (timestamp - TsMin) / ChangePeriod; + ui32 numIntervals = ShardsCount / NumActive; + return ((uidHash % NumActive) + (tsInterval % numIntervals) * NumActive) % ShardsCount; + } + + virtual const std::vector<TString>& GetShardingColumns() const override { + return ShardingColumns; + } + virtual ui32 CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override; + + virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const override; + +}; + +} diff --git a/ydb/core/tx/sharding/ut/CMakeLists.darwin.txt b/ydb/core/tx/sharding/ut/CMakeLists.darwin.txt new file mode 100644 index 00000000000..c60bb0dd917 --- /dev/null +++ b/ydb/core/tx/sharding/ut/CMakeLists.darwin.txt @@ -0,0 +1,82 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-sharding-ut) +target_compile_options(ydb-core-tx-sharding-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-sharding-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering +) +target_link_libraries(ydb-core-tx-sharding-ut PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-system + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-tiering + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + core-testlib-default + core-tx-sharding + public-lib-yson_value +) +target_link_options(ydb-core-tx-sharding-ut PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-core-tx-sharding-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/ut/ut_sharding.cpp +) +set_property( + TARGET + ydb-core-tx-sharding-ut + PROPERTY + SPLIT_FACTOR + 5 +) +add_yunittest( + NAME + ydb-core-tx-sharding-ut + TEST_TARGET + ydb-core-tx-sharding-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-tx-sharding-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-tx-sharding-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-tx-sharding-ut + PROPERTY + TIMEOUT + 600 +) +vcs_info(ydb-core-tx-sharding-ut) diff --git a/ydb/core/tx/sharding/ut/CMakeLists.linux-aarch64.txt b/ydb/core/tx/sharding/ut/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..64f6dac1437 --- /dev/null +++ b/ydb/core/tx/sharding/ut/CMakeLists.linux-aarch64.txt @@ -0,0 +1,84 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-sharding-ut) +target_compile_options(ydb-core-tx-sharding-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-sharding-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering +) +target_link_libraries(ydb-core-tx-sharding-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + cpp-testing-unittest_main + core-tx-tiering + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + core-testlib-default + core-tx-sharding + public-lib-yson_value +) +target_link_options(ydb-core-tx-sharding-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-tx-sharding-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/ut/ut_sharding.cpp +) +set_property( + TARGET + ydb-core-tx-sharding-ut + PROPERTY + SPLIT_FACTOR + 5 +) +add_yunittest( + NAME + ydb-core-tx-sharding-ut + TEST_TARGET + ydb-core-tx-sharding-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-tx-sharding-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-tx-sharding-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-tx-sharding-ut + PROPERTY + TIMEOUT + 600 +) +vcs_info(ydb-core-tx-sharding-ut) diff --git a/ydb/core/tx/sharding/ut/CMakeLists.linux.txt b/ydb/core/tx/sharding/ut/CMakeLists.linux.txt new file mode 100644 index 00000000000..aa8b0faf5eb --- /dev/null +++ b/ydb/core/tx/sharding/ut/CMakeLists.linux.txt @@ -0,0 +1,86 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-sharding-ut) +target_compile_options(ydb-core-tx-sharding-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-sharding-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering +) +target_link_libraries(ydb-core-tx-sharding-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-tiering + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + core-testlib-default + core-tx-sharding + public-lib-yson_value +) +target_link_options(ydb-core-tx-sharding-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-tx-sharding-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/ut/ut_sharding.cpp +) +set_property( + TARGET + ydb-core-tx-sharding-ut + PROPERTY + SPLIT_FACTOR + 5 +) +add_yunittest( + NAME + ydb-core-tx-sharding-ut + TEST_TARGET + ydb-core-tx-sharding-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-tx-sharding-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-tx-sharding-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-tx-sharding-ut + PROPERTY + TIMEOUT + 600 +) +vcs_info(ydb-core-tx-sharding-ut) diff --git a/ydb/core/tx/sharding/ut/CMakeLists.txt b/ydb/core/tx/sharding/ut/CMakeLists.txt new file mode 100644 index 00000000000..5bb4faffb40 --- /dev/null +++ b/ydb/core/tx/sharding/ut/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/core/tx/sharding/ut/ut_sharding.cpp b/ydb/core/tx/sharding/ut/ut_sharding.cpp new file mode 100644 index 00000000000..7e8ae24199b --- /dev/null +++ b/ydb/core/tx/sharding/ut/ut_sharding.cpp @@ -0,0 +1,32 @@ +#include <ydb/core/testlib/cs_helper.h> +#include <ydb/core/tx/sharding/sharding.h> +#include <ydb/core/tx/sharding/xx_hash.h> + +#include <library/cpp/actors/core/av_bootstrapped.h> +#include <library/cpp/protobuf/json/proto2json.h> +#include <library/cpp/testing/unittest/registar.h> + +#include <util/system/hostname.h> + +namespace NKikimr { + +Y_UNIT_TEST_SUITE(Sharding) { + + Y_UNIT_TEST(XXUsage) { + NSharding::TStreamStringHashCalcer hCalcer(0); + for (ui32 a = 1; a < 10; ++a) { + TString ss; + hCalcer.Start(); + for (ui32 i = 0; i < 10000; ++i) { + const ui8 c = RandomNumber<ui8>(); + hCalcer.Update(&c, 1); + ss += (char)c; + UNIT_ASSERT(hCalcer.Finish() == XXH64(ss.data(), ss.size(), 0)); + if (i % 1000 == 0) { + Cerr << hCalcer.Finish() << Endl; + } + } + } + } +} +} diff --git a/ydb/core/tx/sharding/xx_hash.cpp b/ydb/core/tx/sharding/xx_hash.cpp new file mode 100644 index 00000000000..d5e0c416977 --- /dev/null +++ b/ydb/core/tx/sharding/xx_hash.cpp @@ -0,0 +1,17 @@ +#include "xx_hash.h" + +namespace NKikimr::NSharding { + +void TStreamStringHashCalcer::Start() { + XXH64_reset(&HashState, Seed); +} + +void TStreamStringHashCalcer::Update(const ui8* data, const ui32 size) { + XXH64_update(&HashState, data, size); +} + +ui64 TStreamStringHashCalcer::Finish() { + return XXH64_digest(&HashState); +} + +} diff --git a/ydb/core/tx/sharding/xx_hash.h b/ydb/core/tx/sharding/xx_hash.h new file mode 100644 index 00000000000..2761ebdb6db --- /dev/null +++ b/ydb/core/tx/sharding/xx_hash.h @@ -0,0 +1,25 @@ +#pragma once +#include "hash.h" + +#ifndef XXH_STATIC_LINKING_ONLY +# define XXH_STATIC_LINKING_ONLY /* XXH64_state_t */ +#endif +#include <contrib/libs/xxhash/xxhash.h> + +namespace NKikimr::NSharding { + +class TStreamStringHashCalcer: public IHashCalcer { +private: + ui64 Seed; + XXH64_state_t HashState; +public: + TStreamStringHashCalcer(const ui64 seed) + : Seed(seed) { + } + + virtual void Start() override; + virtual void Update(const ui8* data, const ui32 size) override; + virtual ui64 Finish() override; +}; + +} diff --git a/ydb/services/ydb/ydb_long_tx_ut.cpp b/ydb/services/ydb/ydb_long_tx_ut.cpp index 7670e711c40..1a42e1059d1 100644 --- a/ydb/services/ydb/ydb_long_tx_ut.cpp +++ b/ydb/services/ydb/ydb_long_tx_ut.cpp @@ -3,8 +3,8 @@ #include <ydb/public/sdk/cpp/client/draft/ydb_long_tx.h> #include <ydb/core/tx/columnshard/columnshard.h> #include <ydb/core/tx/long_tx_service/public/types.h> +#include <ydb/core/tx/sharding/sharding.h> #include <ydb/core/formats/arrow_helpers.h> -#include <ydb/core/formats/sharding.h> #include <ydb/library/aclib/aclib.h> using namespace NYdb; @@ -35,8 +35,8 @@ TVector<std::shared_ptr<arrow::RecordBatch>> SplitData(const TString& data, ui32 std::shared_ptr<arrow::RecordBatch> batch = NArrow::DeserializeBatch(data, TTestOlap::ArrowSchema()); Y_VERIFY(batch); - NArrow::TLogsSharding sharding(numBatches); - std::vector<ui32> rowSharding = sharding.MakeSharding(batch, {"timestamp", "uid"}); + NSharding::TLogsSharding sharding(numBatches, { "timestamp", "uid" }, numBatches); + std::vector<ui32> rowSharding = sharding.MakeSharding(batch); Y_VERIFY(rowSharding.size() == (size_t)batch->num_rows()); std::vector<std::shared_ptr<arrow::RecordBatch>> sharded = NArrow::ShardingSplit(batch, rowSharding, numBatches); |