diff options
author | kruall <kruall@ydb.tech> | 2025-02-25 17:31:23 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-25 14:31:23 +0000 |
commit | 8afb53071bf69c3b6773832fd14286251f5f1121 (patch) | |
tree | 6f0a33494241c3ccb975c38cdca2a4bd019bd530 | |
parent | b73ee29b2716f5e3243592e9c9270e739b137912 (diff) | |
download | ydb-8afb53071bf69c3b6773832fd14286251f5f1121.tar.gz |
Fix describe_volume and alter_volume (#14978)
-rw-r--r-- | ydb/core/grpc_services/rpc_keyvalue.cpp | 151 | ||||
-rw-r--r-- | ydb/core/protos/flat_scheme_op.proto | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_alter_solomon.cpp | 11 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_path_describer.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_base/ut_base.cpp | 150 | ||||
-rw-r--r-- | ydb/services/keyvalue/grpc_service_ut.cpp | 2 |
6 files changed, 258 insertions, 66 deletions
diff --git a/ydb/core/grpc_services/rpc_keyvalue.cpp b/ydb/core/grpc_services/rpc_keyvalue.cpp index 5aa80c5147..6baba52436 100644 --- a/ydb/core/grpc_services/rpc_keyvalue.cpp +++ b/ydb/core/grpc_services/rpc_keyvalue.cpp @@ -432,58 +432,6 @@ public: } }; -class TAlterVolumeRequest : public TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest> { -public: - using TBase = TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest>; - using TBase::TBase; - - void Bootstrap(const TActorContext& ctx) { - TBase::Bootstrap(ctx); - Become(&TAlterVolumeRequest::StateFunc); - SendProposeRequest(ctx); - } - - void SendProposeRequest(const TActorContext &ctx) { - const auto req = this->GetProtoRequest(); - - std::pair<TString, TString> pathPair; - try { - pathPair = SplitPath(req->path()); - } catch (const std::exception& ex) { - Request_->RaiseIssue(NYql::ExceptionToIssue(ex)); - return Reply(StatusIds::BAD_REQUEST, ctx); - } - const auto& workingDir = pathPair.first; - const auto& name = pathPair.second; - - std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = this->CreateProposeTransaction(); - NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record; - NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme(); - modifyScheme->SetWorkingDir(workingDir); - NKikimrSchemeOp::TAlterSolomonVolume* tableDesc = nullptr; - - modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSolomonVolume); - tableDesc = modifyScheme->MutableAlterSolomonVolume(); - tableDesc->SetName(name); - tableDesc->SetPartitionCount(req->alter_partition_count()); - - if (GetProtoRequest()->has_storage_config()) { - tableDesc->SetUpdateChannelsBinding(true); - auto &storageConfig = GetProtoRequest()->storage_config(); - auto *internalStorageConfig = tableDesc->MutableStorageConfig(); - AssignPoolKinds(storageConfig, internalStorageConfig); - } else { - tableDesc->SetUpdateChannelsBinding(false); - tableDesc->SetChannelProfileId(0); - } - - ctx.Send(MakeTxProxyID(), proposeRequest.release()); - } - - STFUNC(StateFunc) { - return TBase::StateWork(ev); - } -}; template <typename TDerived> class TBaseKeyValueRequest { @@ -621,12 +569,10 @@ protected: Ydb::KeyValue::DescribeVolumeResult result; result.set_path(this->GetProtoRequest()->path()); result.set_partition_count(desc.PartitionsSize()); - if (desc.PartitionsSize() > 0) { - auto *storageConfig = result.mutable_storage_config(); - for (auto &channel : desc.GetPartitions(0).GetBoundChannels()) { - auto *channelBind = storageConfig->add_channel(); - channelBind->set_media(channel.GetStoragePoolName()); - } + auto *storageConfig = result.mutable_storage_config(); + for (auto &channel : desc.GetBoundChannels()) { + auto *channelBind = storageConfig->add_channel(); + channelBind->set_media(channel.GetStoragePoolName()); } this->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, TActivationContext::AsActorContext()); } @@ -640,6 +586,95 @@ private: }; +class TAlterVolumeRequest + : public TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest> + , public TBaseKeyValueRequest<TAlterVolumeRequest> +{ +public: + using TBase = TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest>; + using TBase::TBase; + using TBaseKeyValueRequest::TBaseKeyValueRequest; + friend class TBaseKeyValueRequest<TAlterVolumeRequest>; + + void Bootstrap(const TActorContext& ctx) { + TBase::Bootstrap(ctx); + Become(&TAlterVolumeRequest::StateWork); + if (GetProtoRequest()->has_storage_config()) { + StorageConfig = GetProtoRequest()->storage_config(); + SendProposeRequest(ctx); + } else { + OnBootstrap(); + } + } + + bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) { + return true; + } + + void SendProposeRequest(const TActorContext &ctx) { + const auto req = this->GetProtoRequest(); + + std::pair<TString, TString> pathPair; + try { + pathPair = SplitPath(req->path()); + } catch (const std::exception& ex) { + Request_->RaiseIssue(NYql::ExceptionToIssue(ex)); + return Reply(StatusIds::BAD_REQUEST, ctx); + } + const auto& workingDir = pathPair.first; + const auto& name = pathPair.second; + + std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = this->CreateProposeTransaction(); + NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record; + NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme(); + modifyScheme->SetWorkingDir(workingDir); + NKikimrSchemeOp::TAlterSolomonVolume* tableDesc = nullptr; + + modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSolomonVolume); + tableDesc = modifyScheme->MutableAlterSolomonVolume(); + tableDesc->SetName(name); + tableDesc->SetPartitionCount(req->alter_partition_count()); + + if (GetProtoRequest()->has_storage_config()) { + tableDesc->SetUpdateChannelsBinding(true); + } + auto *internalStorageConfig = tableDesc->MutableStorageConfig(); + AssignPoolKinds(StorageConfig, internalStorageConfig); + + ctx.Send(MakeTxProxyID(), proposeRequest.release()); + } + + STFUNC(StateWork) { + Cerr << "TAlterVolumeRequest::StateWork; received event: " << ev->GetTypeName() << Endl; + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + default: + return TBase::StateWork(ev); + } + } + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev) { + TEvTxProxySchemeCache::TEvNavigateKeySetResult* res = ev->Get(); + NSchemeCache::TSchemeCacheNavigate *request = res->Request.Get(); + + if (!OnNavigateKeySetResult(ev, NACLib::DescribeSchema)) { + return; + } + + const NKikimrSchemeOp::TSolomonVolumeDescription &desc = request->ResultSet[0].SolomonVolumeInfo->Description; + for (auto &channel : desc.GetBoundChannels()) { + auto *channelBind = StorageConfig.add_channel(); + channelBind->set_media(channel.GetStoragePoolName()); + } + SendProposeRequest(TActivationContext::AsActorContext()); + } + +private: + TIntrusiveConstPtr<NACLib::TUserToken> UserToken; + Ydb::KeyValue::StorageConfig StorageConfig; +}; + + class TListLocalPartitionsRequest : public TRpcOperationRequestActor<TListLocalPartitionsRequest, TEvListLocalPartitionsKeyValueRequest> , public TBaseKeyValueRequest<TListLocalPartitionsRequest> diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 9146ea686d..d63b0e157d 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1362,6 +1362,7 @@ message TSolomonVolumeDescription { optional uint64 PathId = 2; optional uint64 PartitionCount = 3; repeated TPartition Partitions = 4; + repeated NKikimrStoragePool.TChannelBind BoundChannels = 5; } message TCreateSolomonVolume { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_solomon.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_solomon.cpp index 611f9a3bca..cc51162ca4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_solomon.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_solomon.cpp @@ -254,17 +254,16 @@ public: return result; } - if (!alter.HasChannelProfileId()) { - result->SetError(TEvSchemeShard::EStatus::StatusInvalidParameter, "set channel profile id, please"); - return result; - } - TChannelsBindings channelsBinding; bool isResolved = false; if (alter.HasStorageConfig()) { isResolved = context.SS->ResolveSolomonChannels(alter.GetStorageConfig(), path.GetPathIdForDomain(), channelsBinding); } else { - isResolved = context.SS->ResolveSolomonChannels(channelProfileId, path.GetPathIdForDomain(), channelsBinding); + if (!alter.HasChannelProfileId()) { + result->SetError(TEvSchemeShard::EStatus::StatusInvalidParameter, "set channel profile id, please"); + return result; + } + isResolved = context.SS->ResolveSolomonChannels(alter.GetChannelProfileId(), path.GetPathIdForDomain(), channelsBinding); } if (!isResolved) { result->SetError(NKikimrScheme::StatusInvalidParameter, "Unable to construct channel binding with the storage pool"); diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index 6bdeec4708..6d5516ff4e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -810,6 +810,15 @@ void TPathDescriber::DescribeSolomonVolume(TPathId pathId, TPathElement::TPtr pa } } + if (solomonVolumeInfo->Partitions.size() > 0) { + auto shardId = solomonVolumeInfo->Partitions.begin()->first; + auto shardInfo = Self->ShardInfos.FindPtr(shardId); + Y_ABORT_UNLESS(shardInfo); + for (const auto& channel : shardInfo->BindedChannels) { + entry->AddBoundChannels()->CopyFrom(channel); + } + } + Sort(entry->MutablePartitions()->begin(), entry->MutablePartitions()->end(), [] (auto& part1, auto& part2) { diff --git a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp index 4b681e61b8..f71e0e5d8b 100644 --- a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp +++ b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp @@ -9216,7 +9216,16 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { "PartitionCount: 40 "); env.TestWaitNotification(runtime, txId); TestDescribeResult(DescribePath(runtime, "/MyRoot/Solomon"), - {NLs::Finished, NLs::PathsInsideDomain(1), NLs::ShardsInsideDomain(40)}); + {NLs::Finished, NLs::PathsInsideDomain(1), NLs::ShardsInsideDomain(40), + [](const NKikimrScheme::TEvDescribeSchemeResult& result){ + const auto& desc = result.GetPathDescription().GetSolomonDescription(); + const auto& boundChannels = desc.GetBoundChannels(); + UNIT_ASSERT_VALUES_EQUAL(boundChannels.size(), 4); + UNIT_ASSERT_VALUES_EQUAL(boundChannels[0].GetStoragePoolName(), "pool-1"); + UNIT_ASSERT_VALUES_EQUAL(boundChannels[1].GetStoragePoolName(), "pool-1"); + UNIT_ASSERT_VALUES_EQUAL(boundChannels[2].GetStoragePoolName(), "pool-1"); + UNIT_ASSERT_VALUES_EQUAL(boundChannels[3].GetStoragePoolName(), "pool-1"); + }}); // Already exists TestCreateSolomon(runtime, ++txId, "/MyRoot", "Name: \"Solomon\" " @@ -9317,7 +9326,16 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { env.TestWaitNotification(runtime, txId); TestDescribeResult(DescribePath(runtime, "/MyRoot/Solomon"), - {NLs::Finished, NLs::PathsInsideDomain(1), NLs::ShardsInsideDomain(4)}); + {NLs::Finished, NLs::PathsInsideDomain(1), NLs::ShardsInsideDomain(4), + [](const NKikimrScheme::TEvDescribeSchemeResult& result){ + const auto& desc = result.GetPathDescription().GetSolomonDescription(); + const auto& boundChannels = desc.GetBoundChannels(); + UNIT_ASSERT_VALUES_EQUAL(boundChannels.size(), 4); + UNIT_ASSERT_VALUES_EQUAL(boundChannels[0].GetStoragePoolName(), "pool-1"); + UNIT_ASSERT_VALUES_EQUAL(boundChannels[1].GetStoragePoolName(), "pool-1"); + UNIT_ASSERT_VALUES_EQUAL(boundChannels[2].GetStoragePoolName(), "pool-1"); + UNIT_ASSERT_VALUES_EQUAL(boundChannels[3].GetStoragePoolName(), "pool-1"); + }}); TestDropSolomon(runtime, ++txId, "/MyRoot", "Solomon"); env.TestWaitNotification(runtime, txId); @@ -9430,6 +9448,134 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { UpdateChannelsBindingSolomon(true); } + void UpdateChannelsBindingSolomonStorageConfig() { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().AllowUpdateChannelsBindingOfSolomonPartitions(true)); + ui64 txId = 100; + + auto check = [&](const TString& path, ui64 shards, const THashMap<TString, ui32>& expectedChannels) { + NKikimrSchemeOp::TDescribeOptions opts; + opts.SetReturnChannelsBinding(true); + + auto makeChannels = [](const auto& boundsChannels) { + THashMap<TString, ui32> channels; + for (const auto& channel : boundsChannels) { + channels[channel.GetStoragePoolName()]++; + } + return channels; + }; + + TestDescribeResult(DescribePath(runtime, path, opts), { + NLs::Finished, + NLs::ShardsInsideDomain(shards), + [&expectedChannels, &makeChannels, &shards] (const NKikimrScheme::TEvDescribeSchemeResult& record) { + const auto& desc = record.GetPathDescription().GetSolomonDescription(); + + UNIT_ASSERT_VALUES_EQUAL(shards, desc.PartitionsSize()); + + for (size_t i = 0; i < desc.PartitionsSize(); ++i) { + const auto& partition = desc.GetPartitions(i); + + THashMap<TString, ui32> channels = makeChannels(partition.GetBoundChannels()); + UNIT_ASSERT_VALUES_EQUAL(expectedChannels.size(), channels.size()); + + for (const auto& [name, count] : expectedChannels) { + UNIT_ASSERT_C(channels.contains(name), "Cannot find channel: " << name); + UNIT_ASSERT_VALUES_EQUAL(channels.at(name), count); + } + } + + THashMap<TString, ui32> volumeChannels = makeChannels(desc.GetBoundChannels()); + UNIT_ASSERT_VALUES_EQUAL(expectedChannels.size(), volumeChannels.size()); + + for (const auto& [name, count] : expectedChannels) { + UNIT_ASSERT_C(volumeChannels.contains(name), "Cannot find channel: " << name); + UNIT_ASSERT_VALUES_EQUAL(volumeChannels.at(name), count); + } + } + }); + }; + + TestCreateSolomon(runtime, ++txId, "/MyRoot", R"( + Name: "Solomon" + PartitionCount: 1 + StorageConfig { + Channel { + PreferredPoolKind: "pool-kind-1" + } + Channel { + PreferredPoolKind: "pool-kind-1" + } + Channel { + PreferredPoolKind: "pool-kind-1" + } + } + )"); + + env.TestWaitNotification(runtime, txId); + check("/MyRoot/Solomon", 1, {{{"pool-1", 3}}}); + // case 1: empty alter + TestAlterSolomon(runtime, ++txId, "/MyRoot", R"( + Name: "Solomon" + StorageConfig { + Channel { + PreferredPoolKind: "pool-kind-2" + } + Channel { + PreferredPoolKind: "pool-kind-2" + } + Channel { + PreferredPoolKind: "pool-kind-2" + } + } + )", {NKikimrScheme::StatusInvalidParameter}); + + // case 2: add partition + TestAlterSolomon(runtime, ++txId, "/MyRoot", R"( + Name: "Solomon" + PartitionCount: 2 + StorageConfig { + Channel { + PreferredPoolKind: "pool-kind-1" + } + Channel { + PreferredPoolKind: "pool-kind-1" + } + Channel { + PreferredPoolKind: "pool-kind-1" + } + } + )"); + + env.TestWaitNotification(runtime, txId); + check("/MyRoot/Solomon", 2, {{"pool-1", 3}}); + + // case 3: add partition & update channels binding + TestAlterSolomon(runtime, ++txId, "/MyRoot", R"( + Name: "Solomon" + PartitionCount: 3 + UpdateChannelsBinding: true + StorageConfig { + Channel { + PreferredPoolKind: "pool-kind-2" + } + Channel { + PreferredPoolKind: "pool-kind-2" + } + Channel { + PreferredPoolKind: "pool-kind-2" + } + } + )"); + + env.TestWaitNotification(runtime, txId); + check("/MyRoot/Solomon", 3, {{"pool-2", 3}}); + } + + Y_UNIT_TEST(UpdateChannelsBindingSolomonStorageConfig) { + UpdateChannelsBindingSolomonStorageConfig(); + } + Y_UNIT_TEST(RejectAlterSolomon) { //+ TTestBasicRuntime runtime; TTestEnv env(runtime); diff --git a/ydb/services/keyvalue/grpc_service_ut.cpp b/ydb/services/keyvalue/grpc_service_ut.cpp index 2b1b1a50a1..4c3da154d0 100644 --- a/ydb/services/keyvalue/grpc_service_ut.cpp +++ b/ydb/services/keyvalue/grpc_service_ut.cpp @@ -813,6 +813,7 @@ Y_UNIT_TEST_SUITE(KeyValueGRPCService) { auto describeVolumeResult = DescribeVolume(channel, tablePath); UNIT_ASSERT_VALUES_EQUAL(1, describeVolumeResult.partition_count()); UNIT_ASSERT(describeVolumeResult.has_storage_config()); + UNIT_ASSERT_VALUES_EQUAL(describeVolumeResult.storage_config().channel_size(), 3); for (const auto& channel : describeVolumeResult.storage_config().channel()) { UNIT_ASSERT_VALUES_EQUAL(channel.media(), "ssd"); } @@ -825,6 +826,7 @@ Y_UNIT_TEST_SUITE(KeyValueGRPCService) { describeVolumeResult = DescribeVolume(channel, tablePath); UNIT_ASSERT_VALUES_EQUAL(2, describeVolumeResult.partition_count()); UNIT_ASSERT(describeVolumeResult.has_storage_config()); + UNIT_ASSERT_VALUES_EQUAL(describeVolumeResult.storage_config().channel_size(), 3); for (const auto& channel : describeVolumeResult.storage_config().channel()) { UNIT_ASSERT_VALUES_EQUAL(channel.media(), "ssd"); } |