aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2022-09-23 20:35:10 +0300
committerchertus <azuikov@ydb.tech>2022-09-23 20:35:10 +0300
commit8d89d23cb935648b05b3528f8083f55b30e2c1be (patch)
tree06a68648c4a366f8a6b9318904acb93f628aebfc
parent7c520dd48df863dfd949097ee2842a4781031d31 (diff)
downloadydb-8d89d23cb935648b05b3528f8083f55b30e2c1be.tar.gz
sharding hash function in LogStore service
-rw-r--r--ydb/core/grpc_services/rpc_log_store.cpp17
-rw-r--r--ydb/core/grpc_services/rpc_long_tx.cpp8
-rw-r--r--ydb/core/protos/flat_scheme_op.proto5
-rw-r--r--ydb/public/api/protos/draft/ydb_logstore.proto18
-rw-r--r--ydb/public/lib/experimental/ydb_logstore.cpp60
-rw-r--r--ydb/public/lib/experimental/ydb_logstore.h52
-rw-r--r--ydb/services/ydb/ydb_logstore_ut.cpp35
7 files changed, 135 insertions, 60 deletions
diff --git a/ydb/core/grpc_services/rpc_log_store.cpp b/ydb/core/grpc_services/rpc_log_store.cpp
index 2ae3f1ffcc..e82b54488f 100644
--- a/ydb/core/grpc_services/rpc_log_store.cpp
+++ b/ydb/core/grpc_services/rpc_log_store.cpp
@@ -205,7 +205,7 @@ private:
modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore);
auto create = modifyScheme->MutableCreateColumnStore();
create->SetName(name);
- create->SetColumnShardCount(req->column_shard_count());
+ create->SetColumnShardCount(req->shards_count());
for (const auto& schemaPreset : req->schema_presets()) {
auto* toSchemaPreset = create->AddSchemaPresets();
toSchemaPreset->SetName(schemaPreset.name());
@@ -267,7 +267,7 @@ private:
}
ConvertDirectoryEntry(pathDescription.GetSelf(), selfEntry, true);
const auto& storeDescription = pathDescription.GetColumnStoreDescription();
- describeLogStoreResult.set_column_shard_count(storeDescription.GetColumnShardCount());
+ describeLogStoreResult.set_shards_count(storeDescription.GetColumnShardCount());
bool firstPreset = true;
for (const auto& schemaPreset : storeDescription.GetSchemaPresets()) {
@@ -455,9 +455,16 @@ private:
}
}
- create->SetColumnShardCount(req->column_shard_count());
+ create->SetColumnShardCount(req->shards_count());
auto* sharding = create->MutableSharding()->MutableHashSharding();
- sharding->SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CLOUD_LOGS);
+ if (req->sharding_type() == Ydb::LogStore::ShardingHashType::HASH_TYPE_MODULO_N) {
+ sharding->SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_MODULO_N);
+ } else {
+ sharding->SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CLOUD_LOGS);
+ if (req->active_shards_count()) {
+ sharding->SetActiveShardsCount(req->active_shards_count());
+ }
+ }
sharding->MutableColumns()->CopyFrom(req->sharding_columns());
ctx.Send(MakeTxProxyID(), proposeRequest.release());
}
@@ -504,7 +511,7 @@ private:
}
ConvertDirectoryEntry(pathDescription.GetSelf(), selfEntry, true);
const auto& tableDescription = pathDescription.GetColumnTableDescription();
- describeLogTableResult.set_column_shard_count(tableDescription.GetColumnShardCount());
+ describeLogTableResult.set_shards_count(tableDescription.GetColumnShardCount());
Ydb::StatusIds::StatusCode status;
TString error;
if (!ConvertSchemaFromInternalToPublic(tableDescription.GetSchema(), *describeLogTableResult.mutable_schema(), status, error)) {
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp
index 01dc3997c7..276a500ea7 100644
--- a/ydb/core/grpc_services/rpc_long_tx.cpp
+++ b/ydb/core/grpc_services/rpc_long_tx.cpp
@@ -107,11 +107,15 @@ TFullSplitData SplitData(const std::shared_ptr<arrow::RecordBatch>& batch,
}
std::vector<ui32> rowSharding;
- if (hashSharding.GetFunction() == NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_DEFAULT) {
+ if (hashSharding.GetFunction() == NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_MODULO_N) {
NArrow::THashSharding sharding(numShards);
rowSharding = sharding.MakeSharding(batch, shardingColumns);
} else if (hashSharding.GetFunction() == NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CLOUD_LOGS) {
- NArrow::TLogsSharding sharding(numShards);
+ ui32 activeShards = NArrow::TLogsSharding::DEFAULT_ACITVE_SHARDS;
+ if (hashSharding.HasActiveShardsCount()) {
+ activeShards = hashSharding.GetActiveShardsCount();
+ }
+ NArrow::TLogsSharding sharding(numShards, activeShards);
rowSharding = sharding.MakeSharding(batch, shardingColumns);
}
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 5d435b1fda..fb78fceeb7 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -590,7 +590,7 @@ message TColumnTableSharding {
message THashSharding {
enum EHashFunction {
// A simple hash modulo number of shards
- HASH_FUNCTION_DEFAULT = 0;
+ HASH_FUNCTION_MODULO_N = 0;
HASH_FUNCTION_CLOUD_LOGS = 1;
}
@@ -605,6 +605,9 @@ message TColumnTableSharding {
// changed and older data with the same column values may be left on
// a different shard.
optional bool UniqueShardKey = 3;
+
+ // Argument for HASH_FUNCTION_CLOUD_LOGS
+ optional uint32 ActiveShardsCount = 4;
}
oneof Method {
diff --git a/ydb/public/api/protos/draft/ydb_logstore.proto b/ydb/public/api/protos/draft/ydb_logstore.proto
index 75e44eae55..0f42cc5a9a 100644
--- a/ydb/public/api/protos/draft/ydb_logstore.proto
+++ b/ydb/public/api/protos/draft/ydb_logstore.proto
@@ -23,6 +23,12 @@ message Compression {
int32 compression_level = 2;
}
+enum ShardingHashType {
+ HASH_TYPE_UNSPECIFIED = 0x0000;
+ HASH_TYPE_MODULO_N = 0x0001; // hash(sharding_columns) % N
+ HASH_TYPE_LOGS_SPECIAL = 0x0002;
+}
+
message ColumnMeta {
string name = 1;
Type type = 2;
@@ -61,7 +67,7 @@ message CreateLogStoreRequest {
Ydb.Operations.OperationParams operation_params = 1;
string path = 2; // Full path
- uint32 column_shard_count = 3;
+ uint32 shards_count = 3;
repeated SchemaPreset schema_presets = 4;
repeated TierConfig tiers = 5;
}
@@ -79,7 +85,7 @@ message DescribeLogStoreRequest {
message DescribeLogStoreResult {
Ydb.Scheme.Entry self = 1; // Description of scheme object
- uint32 column_shard_count = 2;
+ uint32 shards_count = 2;
repeated SchemaPreset schema_presets = 3;
repeated TierConfig tiers = 4;
}
@@ -133,8 +139,10 @@ message CreateLogTableRequest {
};
// Specifies the desired number of ColumnShards for this table
- uint32 column_shard_count = 7;
+ uint32 shards_count = 7;
repeated string sharding_columns = 8;
+ ShardingHashType sharding_type = 9;
+ uint32 active_shards_count = 10; // optional parameter for HASH_TYPE_LOGS_SPECIAL
}
message CreateLogTableResponse {
@@ -158,8 +166,10 @@ message DescribeLogTableResult {
}
// Specifies the desired number of ColumnShards for this table
- uint32 column_shard_count = 6;
+ uint32 shards_count = 6;
repeated string sharding_columns = 7;
+ ShardingHashType sharding_type = 8;
+ uint32 active_shards_count = 9;
}
message DescribeLogTableResponse {
diff --git a/ydb/public/lib/experimental/ydb_logstore.cpp b/ydb/public/lib/experimental/ydb_logstore.cpp
index bfa5e1fe5b..c8b2189da6 100644
--- a/ydb/public/lib/experimental/ydb_logstore.cpp
+++ b/ydb/public/lib/experimental/ydb_logstore.cpp
@@ -104,16 +104,16 @@ void TSchema::SerializeTo(Ydb::LogStore::Schema& schema) const {
DefaultCompression.SerializeTo(*schema.mutable_default_compression());
}
-TLogStoreDescription::TLogStoreDescription(ui32 columnShardCount, const THashMap<TString, TSchema>& schemaPresets,
+TLogStoreDescription::TLogStoreDescription(ui32 shardsCount, const THashMap<TString, TSchema>& schemaPresets,
const THashMap<TString, TTierConfig>& tierConfigs)
- : ColumnShardCount(columnShardCount)
+ : ShardsCount(shardsCount)
, SchemaPresets(schemaPresets)
, TierConfigs(tierConfigs)
{}
TLogStoreDescription::TLogStoreDescription(Ydb::LogStore::DescribeLogStoreResult&& desc,
const TDescribeLogStoreSettings& describeSettings)
- : ColumnShardCount(desc.column_shard_count())
+ : ShardsCount(desc.shards_count())
, SchemaPresets()
, Owner(desc.self().owner())
{
@@ -138,7 +138,7 @@ void TLogStoreDescription::SerializeTo(Ydb::LogStore::CreateLogStoreRequest& req
pb.set_name(presetName);
presetSchema.SerializeTo(*pb.mutable_schema());
}
- request.set_column_shard_count(ColumnShardCount);
+ request.set_shards_count(ShardsCount);
for (const auto& [tierName, tierCfg] : TierConfigs) {
auto& pb = *request.add_tiers();
pb.set_name(tierName);
@@ -153,35 +153,44 @@ TDescribeLogStoreResult::TDescribeLogStoreResult(TStatus&& status, Ydb::LogStore
{}
-TLogTableDescription::TLogTableDescription(const TString& schemaPresetName, const TVector<TString>& shardingColumns,
- ui32 columnShardCount, const TMaybe<TTtlSettings>& ttlSettings)
+TLogTableSharding::TLogTableSharding(const Ydb::LogStore::DescribeLogTableResult& desc)
+ : Type(EShardingHashType::HASH_TYPE_UNSPECIFIED)
+ , Columns(desc.sharding_columns().begin(), desc.sharding_columns().end())
+ , ShardsCount(desc.shards_count())
+ , ActiveShardsCount(desc.active_shards_count())
+{
+ if (desc.sharding_type() == Ydb::LogStore::ShardingHashType::HASH_TYPE_MODULO_N) {
+ Type = EShardingHashType::HASH_TYPE_MODULO_N;
+ } else if (desc.sharding_type() == Ydb::LogStore::ShardingHashType::HASH_TYPE_LOGS_SPECIAL) {
+ Type = EShardingHashType::HASH_TYPE_LOGS_SPECIAL;
+ }
+}
+
+TLogTableDescription::TLogTableDescription(const TString& schemaPresetName, const TLogTableSharding& sharding,
+ const TMaybe<TTtlSettings>& ttlSettings)
: SchemaPresetName(schemaPresetName)
- , ShardingColumns(shardingColumns)
- , ColumnShardCount(columnShardCount)
+ , Sharding(sharding)
, TtlSettings(ttlSettings)
{}
-TLogTableDescription::TLogTableDescription(const TSchema& schema, const TVector<TString>& shardingColumns,
- ui32 columnShardCount, const TMaybe<TTtlSettings>& ttlSettings)
+TLogTableDescription::TLogTableDescription(const TSchema& schema, const TLogTableSharding& sharding,
+ const TMaybe<TTtlSettings>& ttlSettings)
: Schema(schema)
- , ShardingColumns(shardingColumns)
- , ColumnShardCount(columnShardCount)
+ , Sharding(sharding)
, TtlSettings(ttlSettings)
{}
-TLogTableDescription::TLogTableDescription(const TString& schemaPresetName, const TVector<TString>& shardingColumns,
- ui32 columnShardCount, const THashMap<TString, TTier>& tiers)
+TLogTableDescription::TLogTableDescription(const TString& schemaPresetName, const TLogTableSharding& sharding,
+ const THashMap<TString, TTier>& tiers)
: SchemaPresetName(schemaPresetName)
- , ShardingColumns(shardingColumns)
- , ColumnShardCount(columnShardCount)
+ , Sharding(sharding)
, Tiers(tiers)
{}
TLogTableDescription::TLogTableDescription(Ydb::LogStore::DescribeLogTableResult&& desc,
const TDescribeLogTableSettings& describeSettings)
: Schema(desc.schema())
- , ShardingColumns(desc.sharding_columns().begin(), desc.sharding_columns().end())
- , ColumnShardCount(desc.column_shard_count())
+ , Sharding(desc)
, TtlSettings(TtlSettingsFromProto(desc.ttl_settings()))
, Owner(desc.self().owner())
{
@@ -195,10 +204,21 @@ void TLogTableDescription::SerializeTo(Ydb::LogStore::CreateLogTableRequest& req
Schema.SerializeTo(*request.mutable_schema());
}
request.set_schema_preset_name(SchemaPresetName);
- request.set_column_shard_count(ColumnShardCount);
- for (const auto& sc : ShardingColumns) {
+ request.set_shards_count(Sharding.ShardsCount);
+ if (Sharding.ActiveShardsCount) {
+ request.set_active_shards_count(Sharding.ActiveShardsCount);
+ }
+ for (const auto& sc : Sharding.Columns) {
request.add_sharding_columns(sc);
}
+ Ydb::LogStore::ShardingHashType shardingType = (Sharding.Type == NYdb::NLogStore::HASH_TYPE_MODULO_N) ?
+ Ydb::LogStore::ShardingHashType::HASH_TYPE_MODULO_N :
+ Ydb::LogStore::ShardingHashType::HASH_TYPE_LOGS_SPECIAL;
+ request.set_sharding_type(shardingType);
+ if (Sharding.ActiveShardsCount) {
+ request.set_active_shards_count(Sharding.ActiveShardsCount);
+ }
+
if (TtlSettings) {
TtlSettings->SerializeTo(*request.mutable_ttl_settings());
}
diff --git a/ydb/public/lib/experimental/ydb_logstore.h b/ydb/public/lib/experimental/ydb_logstore.h
index 949f0cb656..2e04dfb5fc 100644
--- a/ydb/public/lib/experimental/ydb_logstore.h
+++ b/ydb/public/lib/experimental/ydb_logstore.h
@@ -26,6 +26,12 @@ enum class EColumnCompression {
ZSTD
};
+enum EShardingHashType {
+ HASH_TYPE_UNSPECIFIED,
+ HASH_TYPE_MODULO_N,
+ HASH_TYPE_LOGS_SPECIAL,
+};
+
struct TCompression {
EColumnCompression Codec = EColumnCompression::LZ4;
TMaybe<int> Level;
@@ -111,15 +117,15 @@ private:
class TLogStoreDescription {
public:
- TLogStoreDescription(ui32 columnShardCount, const THashMap<TString, TSchema>& schemaPresets,
+ TLogStoreDescription(ui32 shardsCount, const THashMap<TString, TSchema>& schemaPresets,
const THashMap<TString, TTierConfig>& tierConfigs = {});
TLogStoreDescription(Ydb::LogStore::DescribeLogStoreResult&& desc, const TDescribeLogStoreSettings& describeSettings);
void SerializeTo(Ydb::LogStore::CreateLogStoreRequest& request) const;
const THashMap<TString, TSchema>& GetSchemaPresets() const {
return SchemaPresets;
}
- ui32 GetColumnShardCount() const {
- return ColumnShardCount;
+ ui32 GetShardsCount() const {
+ return ShardsCount;
}
const TString& GetOwner() const {
return Owner;
@@ -135,7 +141,7 @@ public:
}
private:
- ui32 ColumnShardCount;
+ ui32 ShardsCount;
THashMap<TString, TSchema> SchemaPresets;
TString Owner;
TVector<NScheme::TPermissions> Permissions;
@@ -143,24 +149,41 @@ private:
THashMap<TString, TTierConfig> TierConfigs;
};
+struct TLogTableSharding {
+ EShardingHashType Type;
+ TVector<TString> Columns;
+ ui32 ShardsCount;
+ ui32 ActiveShardsCount;
+
+ TLogTableSharding(EShardingHashType type, const TVector<TString>& columns, ui32 shardsCount, ui32 activeShards = 0)
+ : Type(type)
+ , Columns(columns)
+ , ShardsCount(shardsCount)
+ , ActiveShardsCount(activeShards)
+ {}
+
+ TLogTableSharding(const Ydb::LogStore::DescribeLogTableResult& desc);
+};
+
class TLogTableDescription {
public:
- TLogTableDescription(const TString& schemaPresetName, const TVector<TString>& shardingColumns,
- ui32 columnShardCount, const TMaybe<TTtlSettings>& ttlSettings = {});
- TLogTableDescription(const TSchema& schema, const TVector<TString>& shardingColumns,
- ui32 columnShardCount, const TMaybe<TTtlSettings>& ttlSettings = {});
- TLogTableDescription(const TString& schemaPresetName, const TVector<TString>& shardingColumns,
- ui32 columnShardCount, const THashMap<TString, TTier>& tiers);
+ TLogTableDescription(const TString& schemaPresetName, const TLogTableSharding& sharding,
+ const TMaybe<TTtlSettings>& ttlSettings = {});
+ TLogTableDescription(const TSchema& schema, const TLogTableSharding& sharding,
+ const TMaybe<TTtlSettings>& ttlSettings = {});
+ TLogTableDescription(const TString& schemaPresetName, const TLogTableSharding& sharding,
+ const THashMap<TString, TTier>& tiers);
TLogTableDescription(Ydb::LogStore::DescribeLogTableResult&& desc, const TDescribeLogTableSettings& describeSettings);
void SerializeTo(Ydb::LogStore::CreateLogTableRequest& request) const;
const TSchema& GetSchema() const {
return Schema;
}
+
const TVector<TString>& GetShardingColumns() const {
- return ShardingColumns;
+ return Sharding.Columns;
}
- ui32 GetColumnShardCount() const {
- return ColumnShardCount;
+ ui32 GetShardsCount() const {
+ return Sharding.ShardsCount;
}
const TMaybe<TTtlSettings>& GetTtlSettings() const {
return TtlSettings;
@@ -179,8 +202,7 @@ public:
private:
const TString SchemaPresetName;
const TSchema Schema;
- const TVector<TString> ShardingColumns;
- const ui32 ColumnShardCount;
+ const TLogTableSharding Sharding;
const TMaybe<TTtlSettings> TtlSettings;
THashMap<TString, TTier> Tiers;
TString Owner;
diff --git a/ydb/services/ydb/ydb_logstore_ut.cpp b/ydb/services/ydb/ydb_logstore_ut.cpp
index 3aa92c376c..221dcfc206 100644
--- a/ydb/services/ydb/ydb_logstore_ut.cpp
+++ b/ydb/services/ydb/ydb_logstore_ut.cpp
@@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
auto res = logStoreClient.DescribeLogStore("/Root/LogStore").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
auto descr = res.GetDescription();
- UNIT_ASSERT_VALUES_EQUAL(descr.GetColumnShardCount(), 4);
+ UNIT_ASSERT_VALUES_EQUAL(descr.GetShardsCount(), 4);
UNIT_ASSERT_VALUES_EQUAL(descr.GetSchemaPresets().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(descr.GetSchemaPresets().count("default"), 1);
UNIT_ASSERT_VALUES_EQUAL(descr.GetOwner(), "root@builtin");
@@ -124,7 +124,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
auto res = logStoreClient.DescribeLogStore("/Root/LogStore").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
auto descr = res.GetDescription();
- UNIT_ASSERT_VALUES_EQUAL(descr.GetColumnShardCount(), 4);
+ UNIT_ASSERT_VALUES_EQUAL(descr.GetShardsCount(), 4);
UNIT_ASSERT_VALUES_EQUAL(descr.GetSchemaPresets().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(descr.GetSchemaPresets().count("default"), 1);
UNIT_ASSERT_VALUES_EQUAL(descr.GetOwner(), "root@builtin");
@@ -266,7 +266,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
// Log table with intermediate dirs
{
- NYdb::NLogStore::TLogTableDescription tableDescr("default", {"timestamp", "uid"}, 4);
+ NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4);
+ NYdb::NLogStore::TLogTableDescription tableDescr("default", sharding);
auto res = logStoreClient.CreateLogTable("/Root/home/folder/LogStore/Dir1/Dir2/log1", std::move(tableDescr)).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
}
@@ -291,7 +292,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
}
{
- NYdb::NLogStore::TLogTableDescription tableDescr("default", {"timestamp", "uid"}, 4);
+ NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4);
+ NYdb::NLogStore::TLogTableDescription tableDescr("default", sharding);
auto res = logStoreClient.CreateLogTable("/Root/LogStore/log1", std::move(tableDescr)).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
}
@@ -300,7 +302,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
auto res = logStoreClient.DescribeLogTable("/Root/LogStore/log1").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
auto descr = res.GetDescription();
- UNIT_ASSERT_VALUES_EQUAL(descr.GetColumnShardCount(), 4);
+ UNIT_ASSERT_VALUES_EQUAL(descr.GetShardsCount(), 4);
const auto& schema = descr.GetSchema();
UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns().size(), 10);
UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[0].ToString(), "{ name: \"timestamp\", type: Timestamp? }");
@@ -312,7 +314,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
}
{
- NYdb::NLogStore::TLogTableDescription tableDescr(logSchema, {"timestamp", "uid"}, 4);
+ NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4);
+ NYdb::NLogStore::TLogTableDescription tableDescr(logSchema, sharding);
auto res = logStoreClient.CreateLogTable("/Root/LogStore/log2", std::move(tableDescr)).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
}
@@ -321,7 +324,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
auto res = logStoreClient.DescribeLogTable("/Root/LogStore/log2").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
auto descr = res.GetDescription();
- UNIT_ASSERT_VALUES_EQUAL(descr.GetColumnShardCount(), 4);
+ UNIT_ASSERT_VALUES_EQUAL(descr.GetShardsCount(), 4);
const auto& schema = descr.GetSchema();
UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns().size(), 10);
UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[0].ToString(), "{ name: \"timestamp\", type: Timestamp? }");
@@ -333,7 +336,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
}
{
- NYdb::NLogStore::TLogTableDescription tableDescr(logSchema, {"timestamp", "uid"}, 4);
+ NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4);
+ NYdb::NLogStore::TLogTableDescription tableDescr(logSchema, sharding);
auto res = logStoreClient.CreateLogTable("/Root/LogStore/log2", std::move(tableDescr)).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
}
@@ -445,7 +449,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
// Create table without TTL settings
{
- NYdb::NLogStore::TLogTableDescription tableDescr("default", {"timestamp", "uid"}, 4);
+ NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4);
+ NYdb::NLogStore::TLogTableDescription tableDescr("default", sharding);
auto res = logStoreClient.CreateLogTable("/Root/LogStore/log1", std::move(tableDescr)).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
}
@@ -459,7 +464,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
// Create table with TTL settings
{
NYdb::NLogStore::TTtlSettings ttlSettings("saved_at", TDuration::Seconds(2000));
- NYdb::NLogStore::TLogTableDescription tableDescr("default", {"timestamp", "uid"}, 4, ttlSettings);
+ NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4);
+ NYdb::NLogStore::TLogTableDescription tableDescr("default", sharding, ttlSettings);
auto res = logStoreClient.CreateLogTable("/Root/LogStore/log2", std::move(tableDescr)).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
}
@@ -542,7 +548,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
// Use invalid column for TTL
{
NYdb::NLogStore::TTtlSettings ttlSettings("nonexisting_column", TDuration::Seconds(2000));
- NYdb::NLogStore::TLogTableDescription tableDescr("default", {"timestamp", "uid"}, 4, ttlSettings);
+ NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4);
+ NYdb::NLogStore::TLogTableDescription tableDescr("default", sharding, ttlSettings);
auto res = logStoreClient.CreateLogTable("/Root/LogStore/log3", std::move(tableDescr)).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::GENERIC_ERROR, res.GetIssues().ToString());
}
@@ -550,7 +557,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
// Use column of invalid type for TTL
{
NYdb::NLogStore::TTtlSettings ttlSettings("message", NYdb::NTable::TTtlSettings::EUnit::MilliSeconds, TDuration::Seconds(3600));
- NYdb::NLogStore::TLogTableDescription tableDescr("default", {"timestamp", "uid"}, 4, ttlSettings);
+ NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4);
+ NYdb::NLogStore::TLogTableDescription tableDescr("default", sharding, ttlSettings);
auto res = logStoreClient.CreateLogTable("/Root/LogStore/log4", std::move(tableDescr)).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::GENERIC_ERROR, res.GetIssues().ToString());
}
@@ -558,7 +566,8 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
// Use non-Timestamp column for TTL
{
NYdb::NLogStore::TTtlSettings ttlSettings("uint_timestamp", NYdb::NTable::TTtlSettings::EUnit::MilliSeconds, TDuration::Seconds(3600));
- NYdb::NLogStore::TLogTableDescription tableDescr("default", {"timestamp", "uid"}, 4, ttlSettings);
+ NYdb::NLogStore::TLogTableSharding sharding(NYdb::NLogStore::HASH_TYPE_LOGS_SPECIAL, {"timestamp", "uid"}, 4);
+ NYdb::NLogStore::TLogTableDescription tableDescr("default", sharding, ttlSettings);
auto res = logStoreClient.CreateLogTable("/Root/LogStore/log5", std::move(tableDescr)).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::GENERIC_ERROR, res.GetIssues().ToString());
}