diff options
author | chertus <azuikov@ydb.tech> | 2022-09-23 20:35:10 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2022-09-23 20:35:10 +0300 |
commit | 8d89d23cb935648b05b3528f8083f55b30e2c1be (patch) | |
tree | 06a68648c4a366f8a6b9318904acb93f628aebfc | |
parent | 7c520dd48df863dfd949097ee2842a4781031d31 (diff) | |
download | ydb-8d89d23cb935648b05b3528f8083f55b30e2c1be.tar.gz |
sharding hash function in LogStore service
-rw-r--r-- | ydb/core/grpc_services/rpc_log_store.cpp | 17 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_long_tx.cpp | 8 | ||||
-rw-r--r-- | ydb/core/protos/flat_scheme_op.proto | 5 | ||||
-rw-r--r-- | ydb/public/api/protos/draft/ydb_logstore.proto | 18 | ||||
-rw-r--r-- | ydb/public/lib/experimental/ydb_logstore.cpp | 60 | ||||
-rw-r--r-- | ydb/public/lib/experimental/ydb_logstore.h | 52 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_logstore_ut.cpp | 35 |
7 files changed, 135 insertions, 60 deletions
diff --git a/ydb/core/grpc_services/rpc_log_store.cpp b/ydb/core/grpc_services/rpc_log_store.cpp index 2ae3f1ffcc..e82b54488f 100644 --- a/ydb/core/grpc_services/rpc_log_store.cpp +++ b/ydb/core/grpc_services/rpc_log_store.cpp @@ -205,7 +205,7 @@ private: modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore); auto create = modifyScheme->MutableCreateColumnStore(); create->SetName(name); - create->SetColumnShardCount(req->column_shard_count()); + create->SetColumnShardCount(req->shards_count()); for (const auto& schemaPreset : req->schema_presets()) { auto* toSchemaPreset = create->AddSchemaPresets(); toSchemaPreset->SetName(schemaPreset.name()); @@ -267,7 +267,7 @@ private: } ConvertDirectoryEntry(pathDescription.GetSelf(), selfEntry, true); const auto& storeDescription = pathDescription.GetColumnStoreDescription(); - describeLogStoreResult.set_column_shard_count(storeDescription.GetColumnShardCount()); + describeLogStoreResult.set_shards_count(storeDescription.GetColumnShardCount()); bool firstPreset = true; for (const auto& schemaPreset : storeDescription.GetSchemaPresets()) { @@ -455,9 +455,16 @@ private: } } - create->SetColumnShardCount(req->column_shard_count()); + create->SetColumnShardCount(req->shards_count()); auto* sharding = create->MutableSharding()->MutableHashSharding(); - sharding->SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CLOUD_LOGS); + if (req->sharding_type() == Ydb::LogStore::ShardingHashType::HASH_TYPE_MODULO_N) { + sharding->SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_MODULO_N); + } else { + sharding->SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CLOUD_LOGS); + if (req->active_shards_count()) { + sharding->SetActiveShardsCount(req->active_shards_count()); + } + } sharding->MutableColumns()->CopyFrom(req->sharding_columns()); ctx.Send(MakeTxProxyID(), proposeRequest.release()); } @@ -504,7 +511,7 @@ private: } ConvertDirectoryEntry(pathDescription.GetSelf(), selfEntry, true); const auto& tableDescription = pathDescription.GetColumnTableDescription(); - describeLogTableResult.set_column_shard_count(tableDescription.GetColumnShardCount()); + describeLogTableResult.set_shards_count(tableDescription.GetColumnShardCount()); Ydb::StatusIds::StatusCode status; TString error; if (!ConvertSchemaFromInternalToPublic(tableDescription.GetSchema(), *describeLogTableResult.mutable_schema(), status, error)) { diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp index 01dc3997c7..276a500ea7 100644 --- a/ydb/core/grpc_services/rpc_long_tx.cpp +++ b/ydb/core/grpc_services/rpc_long_tx.cpp @@ -107,11 +107,15 @@ TFullSplitData SplitData(const std::shared_ptr<arrow::RecordBatch>& batch, } std::vector<ui32> rowSharding; - if (hashSharding.GetFunction() == NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_DEFAULT) { + if (hashSharding.GetFunction() == NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_MODULO_N) { NArrow::THashSharding sharding(numShards); rowSharding = sharding.MakeSharding(batch, shardingColumns); } else if (hashSharding.GetFunction() == NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CLOUD_LOGS) { - NArrow::TLogsSharding sharding(numShards); + ui32 activeShards = NArrow::TLogsSharding::DEFAULT_ACITVE_SHARDS; + if (hashSharding.HasActiveShardsCount()) { + activeShards = hashSharding.GetActiveShardsCount(); + } + NArrow::TLogsSharding sharding(numShards, activeShards); rowSharding = sharding.MakeSharding(batch, shardingColumns); } diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 5d435b1fda..fb78fceeb7 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -590,7 +590,7 @@ message TColumnTableSharding { message THashSharding { enum EHashFunction { // A simple hash modulo number of shards - HASH_FUNCTION_DEFAULT = 0; + HASH_FUNCTION_MODULO_N = 0; HASH_FUNCTION_CLOUD_LOGS = 1; } @@ -605,6 +605,9 @@ message TColumnTableSharding { // changed and older data with the same column values may be left on // a different shard. optional bool UniqueShardKey = 3; + + // Argument for HASH_FUNCTION_CLOUD_LOGS + optional uint32 ActiveShardsCount = 4; } oneof Method { diff --git a/ydb/public/api/protos/draft/ydb_logstore.proto b/ydb/public/api/protos/draft/ydb_logstore.proto index 75e44eae55..0f42cc5a9a 100644 --- a/ydb/public/api/protos/draft/ydb_logstore.proto +++ b/ydb/public/api/protos/draft/ydb_logstore.proto @@ -23,6 +23,12 @@ message Compression { int32 compression_level = 2; } +enum ShardingHashType { + HASH_TYPE_UNSPECIFIED = 0x0000; + HASH_TYPE_MODULO_N = 0x0001; // hash(sharding_columns) % N + HASH_TYPE_LOGS_SPECIAL = 0x0002; +} + message ColumnMeta { string name = 1; Type type = 2; @@ -61,7 +67,7 @@ message CreateLogStoreRequest { Ydb.Operations.OperationParams operation_params = 1; string path = 2; // Full path - uint32 column_shard_count = 3; + uint32 shards_count = 3; repeated SchemaPreset schema_presets = 4; repeated TierConfig tiers = 5; } @@ -79,7 +85,7 @@ message DescribeLogStoreRequest { message DescribeLogStoreResult { Ydb.Scheme.Entry self = 1; // Description of scheme object - uint32 column_shard_count = 2; + uint32 shards_count = 2; repeated SchemaPreset schema_presets = 3; repeated TierConfig tiers = 4; } @@ -133,8 +139,10 @@ message CreateLogTableRequest { }; // Specifies the desired number of ColumnShards for this table - uint32 column_shard_count = 7; + uint32 shards_count = 7; repeated string sharding_columns = 8; + ShardingHashType sharding_type = 9; + uint32 active_shards_count = 10; // optional parameter for HASH_TYPE_LOGS_SPECIAL } message CreateLogTableResponse { @@ -158,8 +166,10 @@ message DescribeLogTableResult { } // Specifies the desired number of ColumnShards for this table - uint32 column_shard_count = 6; + uint32 shards_count = 6; repeated string sharding_columns = 7; + ShardingHashType sharding_type = 8; + uint32 active_shards_count = 9; } message DescribeLogTableResponse { diff --git a/ydb/public/lib/experimental/ydb_logstore.cpp b/ydb/public/lib/experimental/ydb_logstore.cpp index bfa5e1fe5b..c8b2189da6 100644 --- a/ydb/public/lib/experimental/ydb_logstore.cpp +++ b/ydb/public/lib/experimental/ydb_logstore.cpp @@ -104,16 +104,16 @@ void TSchema::SerializeTo(Ydb::LogStore::Schema& schema) const { DefaultCompression.SerializeTo(*schema.mutable_default_compression()); } -TLogStoreDescription::TLogStoreDescription(ui32 columnShardCount, const THashMap<TString, TSchema>& schemaPresets, +TLogStoreDescription::TLogStoreDescription(ui32 shardsCount, const THashMap<TString, TSchema>& schemaPresets, const THashMap<TString, TTierConfig>& tierConfigs) - : ColumnShardCount(columnShardCount) + : ShardsCount(shardsCount) , SchemaPresets(schemaPresets) , TierConfigs(tierConfigs) {} TLogStoreDescription::TLogStoreDescription(Ydb::LogStore::DescribeLogStoreResult&& desc, const TDescribeLogStoreSettings& describeSettings) - : ColumnShardCount(desc.column_shard_count()) + : ShardsCount(desc.shards_count()) , SchemaPresets() , Owner(desc.self().owner()) { @@ -138,7 +138,7 @@ void TLogStoreDescription::SerializeTo(Ydb::LogStore::CreateLogStoreRequest& req pb.set_name(presetName); presetSchema.SerializeTo(*pb.mutable_schema()); } - request.set_column_shard_count(ColumnShardCount); + request.set_shards_count(ShardsCount); for (const auto& [tierName, tierCfg] : TierConfigs) { auto& pb = *request.add_tiers(); pb.set_name(tierName); @@ -153,35 +153,44 @@ TDescribeLogStoreResult::TDescribeLogStoreResult(TStatus&& status, Ydb::LogStore {} -TLogTableDescription::TLogTableDescription(const TString& schemaPresetName, const TVector<TString>& shardingColumns, - ui32 columnShardCount, const TMaybe<TTtlSettings>& ttlSettings) +TLogTableSharding::TLogTableSharding(const Ydb::LogStore::DescribeLogTableResult& desc) + : Type(EShardingHashType::HASH_TYPE_UNSPECIFIED) + , Columns(desc.sharding_columns().begin(), desc.sharding_columns().end()) + , ShardsCount(desc.shards_count()) + , ActiveShardsCount(desc.active_shards_count()) +{ + if (desc.sharding_type() == Ydb::LogStore::ShardingHashType::HASH_TYPE_MODULO_N) { + Type = EShardingHashType::HASH_TYPE_MODULO_N; + } else if (desc.sharding_type() == Ydb::LogStore::ShardingHashType::HASH_TYPE_LOGS_SPECIAL) { + Type = EShardingHashType::HASH_TYPE_LOGS_SPECIAL; + } +} + +TLogTableDescription::TLogTableDescription(const TString& schemaPresetName, const TLogTableSharding& sharding, + const TMaybe<TTtlSettings>& ttlSettings) : SchemaPresetName(schemaPresetName) - , ShardingColumns(shardingColumns) - , ColumnShardCount(columnShardCount) + , Sharding(sharding) , TtlSettings(ttlSettings) {} -TLogTableDescription::TLogTableDescription(const TSchema& schema, const TVector<TString>& shardingColumns, - ui32 columnShardCount, const TMaybe<TTtlSettings>& ttlSettings) +TLogTableDescription::TLogTableDescription(const TSchema& schema, const TLogTableSharding& sharding, + const TMaybe<TTtlSettings>& ttlSettings) : Schema(schema) - , ShardingColumns(shardingColumns) - , ColumnShardCount(columnShardCount) + , Sharding(sharding) , TtlSettings(ttlSettings) {} -TLogTableDescription::TLogTableDescription(const TString& schemaPresetName, const TVector<TString>& shardingColumns, - ui32 columnShardCount, const THashMap<TString, TTier>& tiers) +TLogTableDescription::TLogTableDescription(const TString& schemaPresetName, const TLogTableSharding& sharding, + const THashMap<TString, TTier>& tiers) : SchemaPresetName(schemaPresetName) - , ShardingColumns(shardingColumns) - , ColumnShardCount(columnShardCount) + , Sharding(sharding) , Tiers(tiers) {} TLogTableDescription::TLogTableDescription(Ydb::LogStore::DescribeLogTableResult&& desc, const TDescribeLogTableSettings& describeSettings) : Schema(desc.schema()) - , ShardingColumns(desc.sharding_columns().begin(), desc.sharding_columns().end()) - , ColumnShardCount(desc.column_shard_count()) + , Sharding(desc) , TtlSettings(TtlSettingsFromProto(desc.ttl_settings())) , Owner(desc.self().owner()) { @@ -195,10 +204,21 @@ void TLogTableDescription::SerializeTo(Ydb::LogStore::CreateLogTableRequest& req Schema.SerializeTo(*request.mutable_schema()); } request.set_schema_preset_name(SchemaPresetName); - request.set_column_shard_count(ColumnShardCount); - for (const auto& sc : ShardingColumns) { + request.set_shards_count(Sharding.ShardsCount); + if (Sharding.ActiveShardsCount) { + request.set_active_shards_count(Sharding.ActiveShardsCount); + } + for (const auto& sc : Sharding.Columns) { request.add_sharding_columns(sc); } + Ydb::LogStore::ShardingHashType shardingType = (Sharding.Type == NYdb::NLogStore::HASH_TYPE_MODULO_N) ? + Ydb::LogStore::ShardingHashType::HASH_TYPE_MODULO_N : + Ydb::LogStore::ShardingHashType::HASH_TYPE_LOGS_SPECIAL; + request.set_sharding_type(shardingType); + if (Sharding.ActiveShardsCount) { + request.set_active_shards_count(Sharding.ActiveShardsCount); + } + if (TtlSettings) { TtlSettings->SerializeTo(*request.mutable_ttl_settings()); } diff --git a/ydb/public/lib/experimental/ydb_logstore.h b/ydb/public/lib/experimental/ydb_logstore.h index 949f0cb656..2e04dfb5fc 100644 --- a/ydb/public/lib/experimental/ydb_logstore.h +++ b/ydb/public/lib/experimental/ydb_logstore.h @@ -26,6 +26,12 @@ enum class EColumnCompression { ZSTD }; +enum EShardingHashType { + HASH_TYPE_UNSPECIFIED, + HASH_TYPE_MODULO_N, + HASH_TYPE_LOGS_SPECIAL, +}; + struct TCompression { EColumnCompression Codec = EColumnCompression::LZ4; TMaybe<int> Level; @@ -111,15 +117,15 @@ private: class TLogStoreDescription { public: - TLogStoreDescription(ui32 columnShardCount, const THashMap<TString, TSchema>& schemaPresets, + TLogStoreDescription(ui32 shardsCount, const THashMap<TString, TSchema>& schemaPresets, const THashMap<TString, TTierConfig>& tierConfigs = {}); TLogStoreDescription(Ydb::LogStore::DescribeLogStoreResult&& desc, const TDescribeLogStoreSettings& describeSettings); void SerializeTo(Ydb::LogStore::CreateLogStoreRequest& request) const; const THashMap<TString, TSchema>& GetSchemaPresets() const { return SchemaPresets; } - ui32 GetColumnShardCount() const { - return ColumnShardCount; + ui32 GetShardsCount() const { + return ShardsCount; } const TString& GetOwner() const { return Owner; @@ -135,7 +141,7 @@ public: } private: - ui32 ColumnShardCount; + ui32 ShardsCount; THashMap<TString, TSchema> SchemaPresets; TString Owner; TVector<NScheme::TPermissions> Permissions; @@ -143,24 +149,41 @@ private: THashMap<TString, TTierConfig> TierConfigs; }; +struct TLogTableSharding { + EShardingHashType Type; + TVector<TString> Columns; + ui32 ShardsCount; + ui32 ActiveShardsCount; + + TLogTableSharding(EShardingHashType type, const TVector<TString>& columns, ui32 shardsCount, ui32 activeShards = 0) + : Type(type) + , Columns(columns) + , ShardsCount(shardsCount) + , ActiveShardsCount(activeShards) + {} + + TLogTableSharding(const Ydb::LogStore::DescribeLogTableResult& desc); +}; + class TLogTableDescription { public: - TLogTableDescription(const TString& schemaPresetName, const TVector<TString>& shardingColumns, - ui32 columnShardCount, const TMaybe<TTtlSettings>& ttlSettings = {}); - TLogTableDescription(const TSchema& schema, const TVector<TString>& shardingColumns, - ui32 columnShardCount, const TMaybe<TTtlSettings>& ttlSettings = {}); - TLogTableDescription(const TString& schemaPresetName, const TVector<TString>& shardingColumns, - ui32 columnShardCount, const THashMap<TString, TTier>& tiers); + TLogTableDescription(const TString& schemaPresetName, const TLogTableSharding& sharding, + const TMaybe<TTtlSettings>& ttlSettings = {}); + TLogTableDescription(const TSchema& schema, const TLogTableSharding& sharding, + const TMaybe<TTtlSettings>& ttlSettings = {}); + TLogTableDescription(const TString& schemaPresetName, const TLogTableSharding& sharding, + const THashMap<TString, TTier>& tiers); TLogTableDescription(Ydb::LogStore::DescribeLogTableResult&& desc, const TDescribeLogTableSettings& describeSettings); void SerializeTo(Ydb::LogStore::CreateLogTableRequest& request) const; const TSchema& GetSchema() const { return Schema; } + const TVector<TString>& GetShardingColumns() const { - return ShardingColumns; + return Sharding.Columns; } - ui32 GetColumnShardCount() const { - return ColumnShardCount; + ui32 GetShardsCount() const { + return Sharding.ShardsCount; } const TMaybe<TTtlSettings>& GetTtlSettings() const { return TtlSettings; @@ -179,8 +202,7 @@ public: private: const TString SchemaPresetName; const TSchema Schema; - const TVector<TString> ShardingColumns; - const ui32 ColumnShardCount; + const TLogTableSharding Sharding; const TMaybe<TTtlSettings> TtlSettings; THashMap<TString, TTier> Tiers; TString Owner; diff --git a/ydb/services/ydb/ydb_logstore_ut.cpp b/ydb/services/ydb/ydb_logstore_ut.cpp index 3aa92c376c..221dcfc206 100644 --- a/ydb/services/ydb/ydb_logstore_ut.cpp +++ b/ydb/services/ydb/ydb_logstore_ut.cpp @@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { auto res = logStoreClient.DescribeLogStore("/Root/LogStore").GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); auto descr = res.GetDescription(); - UNIT_ASSERT_VALUES_EQUAL(descr.GetColumnShardCount(), 4); + UNIT_ASSERT_VALUES_EQUAL(descr.GetShardsCount(), 4); UNIT_ASSERT_VALUES_EQUAL(descr.GetSchemaPresets().size(), 1); UNIT_ASSERT_VALUES_EQUAL(descr.GetSchemaPresets().count("default"), 1); UNIT_ASSERT_VALUES_EQUAL(descr.GetOwner(), "root@builtin"); @@ -124,7 +124,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { auto res = logStoreClient.DescribeLogStore("/Root/LogStore").GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); auto descr = res.GetDescription(); - UNIT_ASSERT_VALUES_EQUAL(descr.GetColumnShardCount(), 4); + UNIT_ASSERT_VALUES_EQUAL(descr.GetShardsCount(), 4); UNIT_ASSERT_VALUES_EQUAL(descr.GetSchemaPresets().size(), 1); UNIT_ASSERT_VALUES_EQUAL(descr.GetSchemaPresets().count("default"), 1); UNIT_ASSERT_VALUES_EQUAL(descr.GetOwner(), "root@builtin"); @@ -266,7 +266,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { // Log table with intermediate dirs { - NYdb::NLogStore::TLogTableDescription tableDescr("default", {"timestamp", "uid"}, 4); + NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4); + NYdb::NLogStore::TLogTableDescription tableDescr("default", sharding); auto res = logStoreClient.CreateLogTable("/Root/home/folder/LogStore/Dir1/Dir2/log1", std::move(tableDescr)).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); } @@ -291,7 +292,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { } { - NYdb::NLogStore::TLogTableDescription tableDescr("default", {"timestamp", "uid"}, 4); + NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4); + NYdb::NLogStore::TLogTableDescription tableDescr("default", sharding); auto res = logStoreClient.CreateLogTable("/Root/LogStore/log1", std::move(tableDescr)).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); } @@ -300,7 +302,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { auto res = logStoreClient.DescribeLogTable("/Root/LogStore/log1").GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); auto descr = res.GetDescription(); - UNIT_ASSERT_VALUES_EQUAL(descr.GetColumnShardCount(), 4); + UNIT_ASSERT_VALUES_EQUAL(descr.GetShardsCount(), 4); const auto& schema = descr.GetSchema(); UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns().size(), 10); UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[0].ToString(), "{ name: \"timestamp\", type: Timestamp? }"); @@ -312,7 +314,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { } { - NYdb::NLogStore::TLogTableDescription tableDescr(logSchema, {"timestamp", "uid"}, 4); + NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4); + NYdb::NLogStore::TLogTableDescription tableDescr(logSchema, sharding); auto res = logStoreClient.CreateLogTable("/Root/LogStore/log2", std::move(tableDescr)).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); } @@ -321,7 +324,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { auto res = logStoreClient.DescribeLogTable("/Root/LogStore/log2").GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); auto descr = res.GetDescription(); - UNIT_ASSERT_VALUES_EQUAL(descr.GetColumnShardCount(), 4); + UNIT_ASSERT_VALUES_EQUAL(descr.GetShardsCount(), 4); const auto& schema = descr.GetSchema(); UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns().size(), 10); UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[0].ToString(), "{ name: \"timestamp\", type: Timestamp? }"); @@ -333,7 +336,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { } { - NYdb::NLogStore::TLogTableDescription tableDescr(logSchema, {"timestamp", "uid"}, 4); + NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4); + NYdb::NLogStore::TLogTableDescription tableDescr(logSchema, sharding); auto res = logStoreClient.CreateLogTable("/Root/LogStore/log2", std::move(tableDescr)).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); } @@ -445,7 +449,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { // Create table without TTL settings { - NYdb::NLogStore::TLogTableDescription tableDescr("default", {"timestamp", "uid"}, 4); + NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4); + NYdb::NLogStore::TLogTableDescription tableDescr("default", sharding); auto res = logStoreClient.CreateLogTable("/Root/LogStore/log1", std::move(tableDescr)).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); } @@ -459,7 +464,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { // Create table with TTL settings { NYdb::NLogStore::TTtlSettings ttlSettings("saved_at", TDuration::Seconds(2000)); - NYdb::NLogStore::TLogTableDescription tableDescr("default", {"timestamp", "uid"}, 4, ttlSettings); + NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4); + NYdb::NLogStore::TLogTableDescription tableDescr("default", sharding, ttlSettings); auto res = logStoreClient.CreateLogTable("/Root/LogStore/log2", std::move(tableDescr)).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); } @@ -542,7 +548,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { // Use invalid column for TTL { NYdb::NLogStore::TTtlSettings ttlSettings("nonexisting_column", TDuration::Seconds(2000)); - NYdb::NLogStore::TLogTableDescription tableDescr("default", {"timestamp", "uid"}, 4, ttlSettings); + NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4); + NYdb::NLogStore::TLogTableDescription tableDescr("default", sharding, ttlSettings); auto res = logStoreClient.CreateLogTable("/Root/LogStore/log3", std::move(tableDescr)).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::GENERIC_ERROR, res.GetIssues().ToString()); } @@ -550,7 +557,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { // Use column of invalid type for TTL { NYdb::NLogStore::TTtlSettings ttlSettings("message", NYdb::NTable::TTtlSettings::EUnit::MilliSeconds, TDuration::Seconds(3600)); - NYdb::NLogStore::TLogTableDescription tableDescr("default", {"timestamp", "uid"}, 4, ttlSettings); + NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4); + NYdb::NLogStore::TLogTableDescription tableDescr("default", sharding, ttlSettings); auto res = logStoreClient.CreateLogTable("/Root/LogStore/log4", std::move(tableDescr)).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::GENERIC_ERROR, res.GetIssues().ToString()); } @@ -558,7 +566,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { // Use non-Timestamp column for TTL { NYdb::NLogStore::TTtlSettings ttlSettings("uint_timestamp", NYdb::NTable::TTtlSettings::EUnit::MilliSeconds, TDuration::Seconds(3600)); - NYdb::NLogStore::TLogTableDescription tableDescr("default", {"timestamp", "uid"}, 4, ttlSettings); + NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4); + NYdb::NLogStore::TLogTableDescription tableDescr("default", sharding, ttlSettings); auto res = logStoreClient.CreateLogTable("/Root/LogStore/log5", std::move(tableDescr)).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::GENERIC_ERROR, res.GetIssues().ToString()); } |