diff options
| author | FloatingCrowbar <[email protected]> | 2024-08-08 17:03:42 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-08-08 14:03:42 +0000 |
| commit | bb76355267b4b29abca62e1a0ba1ee797bca7f54 (patch) | |
| tree | 4bfc5cf171f296e1eb91cd676e2f2003caad8658 | |
| parent | f7c4264a7287d30f19fc67ebcf41907f1e9fa01d (diff) | |
Topic control plane DDL support for query service (#7438)
29 files changed, 989 insertions, 350 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp index f498a4ee092..55600858669 100644 --- a/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp @@ -34,14 +34,14 @@ class TAlterConfigsActor : public TAlterTopicActor<TAlterConfigsActor, TKafkaAlt public: TAlterConfigsActor( - TActorId requester, + TActorId requester, TIntrusiveConstPtr<NACLib::TUserToken> userToken, TString topicPath, TString databaseName, std::optional<ui64> retentionMs, std::optional<ui64> retentionBytes) : TAlterTopicActor<TAlterConfigsActor, TKafkaAlterConfigsRequest>( - requester, + requester, userToken, topicPath, databaseName) @@ -54,12 +54,12 @@ public: ~TAlterConfigsActor() = default; void ModifyPersqueueConfig( - const TActorContext& ctx, + NKikimr::TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, const NKikimrSchemeOp::TDirEntry& selfInfo ) { - Y_UNUSED(ctx); + Y_UNUSED(appData); Y_UNUSED(pqGroupDescription); Y_UNUSED(selfInfo); @@ -150,7 +150,7 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) { resource.ResourceName.value(), Context->DatabasePath, convertedRetentions.Ms, - convertedRetentions.Bytes + convertedRetentions.Bytes )); InflyTopics++; @@ -201,7 +201,7 @@ void TKafkaAlterConfigsActor::Reply(const TActorContext& ctx) { responseResource.ErrorCode = INVALID_REQUEST; response->Responses.push_back(responseResource); responseStatus = INVALID_REQUEST; - } + } Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, responseStatus)); diff --git a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp index 6e13bd61cc7..521f8598f9b 100644 --- a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp @@ -215,13 +215,13 @@ class TCreatePartitionsActor : public TAlterTopicActor<TCreatePartitionsActor, T public: TCreatePartitionsActor( - TActorId requester, + TActorId requester, TIntrusiveConstPtr<NACLib::TUserToken> userToken, TString topicPath, TString databaseName, ui32 partitionsNumber) : TAlterTopicActor<TCreatePartitionsActor, TKafkaTopicModificationRequest>( - requester, + requester, userToken, topicPath, databaseName) @@ -234,12 +234,12 @@ public: }; void ModifyPersqueueConfig( - const TActorContext& ctx, + NKikimr::TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, const NKikimrSchemeOp::TDirEntry& selfInfo - ) { - Y_UNUSED(ctx); + ) override { + Y_UNUSED(appData); Y_UNUSED(pqGroupDescription); Y_UNUSED(selfInfo); @@ -347,7 +347,7 @@ void TKafkaCreatePartitionsActor::Reply(const TActorContext& ctx) { response->Results.push_back(responseTopic); responseStatus = INVALID_REQUEST; - } + } Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, responseStatus)); Die(ctx); diff --git a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp index 3fad0055a1b..44af7beb5ff 100644 --- a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp @@ -16,7 +16,7 @@ class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCre public: TCreateTopicActor( - TActorId requester, + TActorId requester, TIntrusiveConstPtr<NACLib::TUserToken> userToken, TString topicPath, TString databaseName, @@ -78,13 +78,13 @@ public: name, topicRequest, modifyScheme, - ctx, + NKikimr::AppData(ctx), error, workingDir, proposal.Record.GetDatabaseName() ); if (codes.YdbCode != Ydb::StatusIds::SUCCESS) { - return ReplyWithError(codes.YdbCode, codes.PQCode, error, ctx); + return ReplyWithError(codes.YdbCode, codes.PQCode, error); } }; @@ -192,7 +192,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) { TopicNamesToRetentions[topicName] = std::pair<std::optional<ui64>, std::optional<ui64>>( convertedRetentions.Ms, - convertedRetentions.Bytes + convertedRetentions.Bytes ); ctx.Register(new TCreateTopicActor( @@ -202,7 +202,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) { Context->DatabasePath, topic.NumPartitions, convertedRetentions.Ms, - convertedRetentions.Bytes + convertedRetentions.Bytes )); InflyTopics++; @@ -243,7 +243,7 @@ void TKafkaCreateTopicsActor::Reply(const TActorContext& ctx) { responseTopic.ErrorMessage = TopicNamesToResponses[topicName]->Message; } - auto addConfigIfRequired = [this, &topicName, &responseTopic](std::optional<ui64> configValue, TString configName) { + auto addConfigIfRequired = [this, &topicName, &responseTopic](std::optional<ui64> configValue, TString configName) { if (configValue.has_value()) { TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs config; config.Name = configName; @@ -271,7 +271,7 @@ void TKafkaCreateTopicsActor::Reply(const TActorContext& ctx) { responseTopic.ErrorMessage = "Duplicate topic in request."; response->Topics.push_back(responseTopic); responseStatus = INVALID_REQUEST; - } + } Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, responseStatus)); diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h index f3bacc32b39..14855d40c1d 100644 --- a/ydb/core/kafka_proxy/kafka_events.h +++ b/ydb/core/kafka_proxy/kafka_events.h @@ -208,8 +208,8 @@ struct TGetOffsetsRequest : public NKikimr::NGRpcProxy::V1::TLocalRequestBase { TVector<ui32> PartitionIds; }; -struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResponse, EvTopicOffsetsResponse> - , public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase +struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResponse, EvTopicOffsetsResponse> + , public NKikimr::NGRpcProxy::V1::TLocalResponseBase { TEvTopicOffsetsResponse() {} @@ -217,8 +217,8 @@ struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResp TVector<TPartitionOffsetsInfo> Partitions; }; -struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffsetsResponse, EvTopicOffsetsResponse> - , public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase +struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffsetsResponse, EvTopicOffsetsResponse> + , public NKikimr::NGRpcProxy::V1::TLocalResponseBase { TEvCommitedOffsetsResponse() {} @@ -228,8 +228,8 @@ struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffse std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets; }; -struct TEvTopicModificationResponse : public NActors::TEventLocal<TEvTopicModificationResponse, EvCreateTopicsResponse> - , public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase +struct TEvTopicModificationResponse : public NActors::TEventLocal<TEvTopicModificationResponse, EvCreateTopicsResponse> + , public NKikimr::NGRpcProxy::V1::TLocalResponseBase { enum EStatus { OK, diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index f61cb111663..15835cf536e 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -308,11 +308,29 @@ public: break; } + case NKqpProto::TKqpSchemeOperation::kCreateTopic: { + const auto& modifyScheme = schemeOp.GetCreateTopic(); + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); + break; + } + + case NKqpProto::TKqpSchemeOperation::kAlterTopic: { + const auto& modifyScheme = schemeOp.GetAlterTopic(); + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); + break; + } + + case NKqpProto::TKqpSchemeOperation::kDropTopic: { + const auto& modifyScheme = schemeOp.GetDropTopic(); + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); + break; + } + case NKqpProto::TKqpSchemeOperation::kAnalyzeTable: { const auto& analyzeOperation = schemeOp.GetAnalyzeTable(); - + auto analyzePromise = NewPromise<IKqpGateway::TGenericResult>(); - + TVector<TString> columns{analyzeOperation.columns().begin(), analyzeOperation.columns().end()}; IActor* analyzeActor = new TAnalyzeActor(analyzeOperation.GetTablePath(), columns, analyzePromise); @@ -326,9 +344,10 @@ public: actorSystem->Send(selfId, ev.Release()); }); - + Become(&TKqpSchemeExecuter::ExecuteState); return; + } default: @@ -459,7 +478,7 @@ public: } void Handle(TEvPrivate::TEvMakeTempDirResult::TPtr& result) { - if (!result->Get()->Result.Success()) { + if (!result->Get()->Result.Success()) { InternalError(TStringBuilder() << "Error creating temporary directory for session " << SessionId << ": " << result->Get()->Result.Issues().ToString(true)); diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 4990244c7a9..f33bd700ae3 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -977,7 +977,11 @@ public: return NotImplemented<TGenericResult>(); } - TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request) override { + TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request, bool existingOk) override { + if (existingOk) { + return MakeFuture(ResultFromError<TGenericResult>("IF NOT EXISTS statement is not supported for CREATE TOPIC in yql script")); + } + try { if (!CheckCluster(cluster)) { return InvalidCluster<TGenericResult>(cluster); @@ -989,9 +993,27 @@ public: catch (yexception& e) { return MakeFuture(ResultFromException<TGenericResult>(e)); } + Y_UNUSED(existingOk); } - TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request) override { + TFuture<NKikimr::NGRpcProxy::V1::TAlterTopicResponse> AlterTopicPrepared(NYql::TAlterTopicSettings&& settings) override { + auto schemaTxPromise = NewPromise<NKikimr::NGRpcProxy::V1::TAlterTopicResponse>(); + auto schemaTxFuture = schemaTxPromise.GetFuture(); + + NKikimr::NGRpcProxy::V1::TAlterTopicRequest request{ + std::move(settings.Request), settings.WorkDir, settings.Name, Database, GetTokenCompat(), + settings.MissingOk + }; + IActor* requestHandler = new NKikimr::NGRpcProxy::V1::TAlterTopicActorInternal(std::move(request), std::move(schemaTxPromise), settings.MissingOk); + RegisterActor(requestHandler); + return schemaTxFuture; + } + + TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request, bool missingOk) override { + if (missingOk) { + return MakeFuture(ResultFromError<TGenericResult>("IF EXISTS statement is not supported for ALTER TOPIC in yql script")); + } + try { if (!CheckCluster(cluster)) { return InvalidCluster<TGenericResult>(cluster); @@ -1005,7 +1027,11 @@ public: } } - TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic) override { + TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic, bool missingOk) override { + if (missingOk) { + return MakeFuture(ResultFromError<TGenericResult>("IF EXISTS statement is not supported for DROP TOPIC in yql script")); + } + try { if (!CheckCluster(cluster)) { return InvalidCluster<TGenericResult>(cluster); @@ -1020,6 +1046,7 @@ public: catch (yexception& e) { return MakeFuture(ResultFromException<TGenericResult>(e)); } + } TFuture<TGenericResult> CreateReplication(const TString&, const NYql::TCreateReplicationSettings&) override { diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 901e6e3066c..87f6d092118 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -8,6 +8,7 @@ #include <ydb/core/ydb_convert/ydb_convert.h> #include <ydb/public/sdk/cpp/client/impl/ydb_internal/common/parser.h> #include <ydb/services/metadata/abstract/kqp_common.h> +#include <ydb/services/lib/actors/pq_schema_actor.h> namespace NKikimr::NKqp { @@ -108,7 +109,7 @@ bool ConvertDataSlotToYdbTypedValue(NYql::EDataSlot fromType, const TString& fro case NYql::EDataSlot::Interval64: toType->set_type_id(Ydb::Type::INTERVAL64); toValue->set_int64_value(FromString<i64>(fromValue)); - break; + break; default: return false; } @@ -907,16 +908,120 @@ public: return dropPromise.GetFuture(); } - TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request) override { - FORWARD_ENSURE_NO_PREPARE(CreateTopic, cluster, std::move(request)); + TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request, bool existingOk) override { + CHECK_PREPARED_DDL(CreateTopic); + Y_UNUSED(cluster); + + std::pair<TString, TString> pathPair; + TString error; + auto createPromise = NewPromise<TGenericResult>(); + if (!NSchemeHelpers::SplitTablePath(request.path(), GetDatabase(), pathPair, error, false)) { + return MakeFuture(ResultFromError<TGenericResult>(error)); + } + NKikimrSchemeOp::TModifyScheme schemeTx; + schemeTx.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup); + + schemeTx.SetWorkingDir(pathPair.first); + + auto pqDescr = schemeTx.MutableCreatePersQueueGroup(); + pqDescr->SetName(pathPair.second); + NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(pathPair.second, request, schemeTx, AppData(ActorSystem), error, pathPair.first); + + if (IsPrepare()) { + auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); + auto& phyTx = *phyQuery.AddTransactions(); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + + + phyTx.MutableSchemeOperation()->MutableCreateTopic()->Swap(&schemeTx); + phyTx.MutableSchemeOperation()->MutableCreateTopic()->SetFailedOnAlreadyExists(!existingOk); + TGenericResult result; + result.SetSuccess(); + createPromise.SetValue(result); + } else { + return Gateway->CreateTopic(cluster, std::move(request), existingOk); + } + return createPromise.GetFuture(); } - TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request) override { - FORWARD_ENSURE_NO_PREPARE(AlterTopic, cluster, std::move(request)); + TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request, bool missingOk) override { + CHECK_PREPARED_DDL(AlterTopic); + Y_UNUSED(cluster); + std::pair<TString, TString> pathPair; + TString error; + if (!NSchemeHelpers::SplitTablePath(request.path(), GetDatabase(), pathPair, error, false)) { + return MakeFuture(ResultFromError<TGenericResult>(error)); + } + auto alterPromise = NewPromise<TGenericResult>(); + + if (IsPrepare()) { + TAlterTopicSettings settings{std::move(request), pathPair.second, pathPair.first, missingOk}; + auto getModifySchemeFuture = Gateway->AlterTopicPrepared(std::move(settings)); + + + auto* phyQuery = SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); + + getModifySchemeFuture.Subscribe([=] (const auto future) mutable { + TGenericResult result; + auto modifySchemeResult = future.GetValue(); + if (modifySchemeResult.Status == Ydb::StatusIds::SUCCESS) { + if (modifySchemeResult.ModifyScheme.HasAlterPersQueueGroup()) { + auto* phyTx = phyQuery->AddTransactions(); + phyTx->SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + phyTx->MutableSchemeOperation()->MutableAlterTopic()->Swap(&modifySchemeResult.ModifyScheme); + phyTx->MutableSchemeOperation()->MutableAlterTopic()->SetSuccessOnNotExist(missingOk); + } + result.SetSuccess(); + + } else { + result.SetStatus(NYql::YqlStatusFromYdbStatus(modifySchemeResult.Status)); + result.AddIssues(modifySchemeResult.Issues); + } + alterPromise.SetValue(result); + }); + + } else { + return Gateway->AlterTopic(cluster, std::move(request), missingOk); + } + return alterPromise.GetFuture(); + } - TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic) override { - FORWARD_ENSURE_NO_PREPARE(DropTopic, cluster, topic); + NThreading::TFuture<NKikimr::NGRpcProxy::V1::TAlterTopicResponse> AlterTopicPrepared(TAlterTopicSettings&& settings) override { + return Gateway->AlterTopicPrepared(std::move(settings)); + } + + TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic, bool missingOk) override { + CHECK_PREPARED_DDL(DropTopic); + Y_UNUSED(cluster); + + std::pair<TString, TString> pathPair; + TString error; + auto dropPromise = NewPromise<TGenericResult>(); + if (!NSchemeHelpers::SplitTablePath(topic, GetDatabase(), pathPair, error, false)) { + return MakeFuture(ResultFromError<TGenericResult>(error)); + } + + if (IsPrepare()) { + auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); + auto& phyTx = *phyQuery.AddTransactions(); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + + NKikimrSchemeOp::TModifyScheme schemeTx; + schemeTx.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropPersQueueGroup); + + schemeTx.SetWorkingDir(pathPair.first); + schemeTx.MutableDrop()->SetName(pathPair.second); + + phyTx.MutableSchemeOperation()->MutableDropTopic()->Swap(&schemeTx); + phyTx.MutableSchemeOperation()->MutableDropTopic()->SetSuccessOnNotExist(missingOk); + TGenericResult result; + result.SetSuccess(); + dropPromise.SetValue(result); + } else { + return Gateway->DropTopic(cluster, topic, missingOk); + } + return dropPromise.GetFuture(); } TFuture<TGenericResult> ModifyPermissions(const TString& cluster, diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp index c1675eeba43..0831fb24ee6 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp @@ -1168,7 +1168,8 @@ public: YQL_ENSURE(settings.Mode); auto mode = settings.Mode.Cast(); - if (mode == "create") { + if (mode == "create" || mode == "create_if_not_exists") { + bool existingOk = mode == "create_if_not_exists"; return Build<TKiCreateTopic>(ctx, node->Pos()) .World(node->Child(0)) .DataSink(node->Child(1)) @@ -1176,9 +1177,11 @@ public: .TopicSettings(settings.TopicSettings.Cast()) .Consumers(settings.Consumers.Cast()) .Settings(settings.Other) + .ExistingOk<TCoAtom>().Value(existingOk).Build() .Done() .Ptr(); - } else if (mode == "alter") { + } else if (mode == "alter" || mode == "alter_if_exists") { + bool missingOk = mode == "alter_if_exists"; return Build<TKiAlterTopic>(ctx, node->Pos()) .World(node->Child(0)) .DataSink(node->Child(1)) @@ -1188,14 +1191,17 @@ public: .AlterConsumers(settings.AlterConsumers.Cast()) .DropConsumers(settings.DropConsumers.Cast()) .Settings(settings.Other) + .MissingOk<TCoAtom>().Value(missingOk).Build() .Done() .Ptr(); - } else if (mode == "drop") { + } else if (mode == "drop" || mode == "drop_if_exists") { + bool missingOk = (mode == "drop_if_exists"); return Build<TKiDropTopic>(ctx, node->Pos()) .World(node->Child(0)) .DataSink(node->Child(1)) .Topic().Build(key.GetTopicPath()) .Settings(settings.Other) + .MissingOk<TCoAtom>().Value(missingOk).Build() .Done() .Ptr(); } else { diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index bfa76ad0d18..3f83b415947 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -1968,38 +1968,6 @@ public: }); } - if (auto maybeCreate = TMaybeNode<TKiCreateTopic>(input)) { - auto requireStatus = RequireChild(*input, 0); - if (requireStatus.Level != TStatus::Ok) { - return SyncStatus(requireStatus); - } - auto cluster = TString(maybeCreate.Cast().DataSink().Cluster()); - TString topicName = TString(maybeCreate.Cast().Topic()); - Ydb::Topic::CreateTopicRequest createReq; - createReq.set_path(topicName); - for (const auto& consumer : maybeCreate.Cast().Consumers()) { - auto error = AddConsumerToTopicRequest(createReq.add_consumers(), consumer); - if (!error.empty()) { - ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << error << input->Content())); - return SyncError(); - } - } - AddTopicSettingsToRequest(&createReq,maybeCreate.Cast().TopicSettings()); - bool prepareOnly = SessionCtx->Query().PrepareOnly; - // DEBUG - // Cerr << "Create topic request proto: " << createReq.DebugString() << Endl; - auto future = prepareOnly ? CreateDummySuccess() : ( - Gateway->CreateTopic(cluster, std::move(createReq)) - ); - - return WrapFuture(future, - [](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) { - Y_UNUSED(res); - auto resultNode = ctx.NewWorld(input->Pos()); - return resultNode; - }, "Executing CREATE TOPIC"); - } - if (auto maybeCreateSequence = TMaybeNode<TKiCreateSequence>(input)) { auto requireStatus = RequireChild(*input, 0); if (requireStatus.Level != TStatus::Ok) { @@ -2061,6 +2029,35 @@ public: }, "Executing CREATE SEQUENCE"); } + if (auto maybeCreate = TMaybeNode<TKiCreateTopic>(input)) { + auto requireStatus = RequireChild(*input, 0); + if (requireStatus.Level != TStatus::Ok) { + return SyncStatus(requireStatus); + } + auto cluster = TString(maybeCreate.Cast().DataSink().Cluster()); + TString topicName = TString(maybeCreate.Cast().Topic()); + Ydb::Topic::CreateTopicRequest createReq; + createReq.set_path(topicName); + for (const auto& consumer : maybeCreate.Cast().Consumers()) { + auto error = AddConsumerToTopicRequest(createReq.add_consumers(), consumer); + if (!error.empty()) { + ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << error << input->Content())); + return SyncError(); + } + } + AddTopicSettingsToRequest(&createReq,maybeCreate.Cast().TopicSettings()); + bool existingOk = (maybeCreate.ExistingOk().Cast().Value() == "1"); + + auto future = Gateway->CreateTopic(cluster, std::move(createReq), existingOk); + + return WrapFuture(future, + [](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) { + Y_UNUSED(res); + auto resultNode = ctx.NewWorld(input->Pos()); + return resultNode; + }, "Executing CREATE TOPIC"); + } + if (auto maybeAlter = TMaybeNode<TKiAlterTopic>(input)) { auto requireStatus = RequireChild(*input, 0); if (requireStatus.Level != TStatus::Ok) { @@ -2088,13 +2085,9 @@ public: auto name = consumer.Cast<TCoAtom>().StringValue(); alterReq.add_drop_consumers(name); } + bool missingOk = (maybeAlter.MissingOk().Cast().Value() == "1"); AddAlterTopicSettingsToRequest(&alterReq, maybeAlter.Cast().TopicSettings()); - bool prepareOnly = SessionCtx->Query().PrepareOnly; - // DEBUG - // Cerr << "Alter topic request proto:\n" << alterReq.DebugString() << Endl; - auto future = prepareOnly ? CreateDummySuccess() : ( - Gateway->AlterTopic(cluster, std::move(alterReq)) - ); + auto future = Gateway->AlterTopic(cluster, std::move(alterReq), missingOk); return WrapFuture(future, [](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) { @@ -2105,21 +2098,15 @@ public: } if (auto maybeDrop = TMaybeNode<TKiDropTopic>(input)) { - if (!EnsureNotPrepare("DROP TOPIC", input->Pos(), SessionCtx->Query(), ctx)) { - return SyncError(); - } - auto requireStatus = RequireChild(*input, 0); if (requireStatus.Level != TStatus::Ok) { return SyncStatus(requireStatus); } auto cluster = TString(maybeDrop.Cast().DataSink().Cluster()); TString topicName = TString(maybeDrop.Cast().Topic()); + bool missingOk = (maybeDrop.MissingOk().Cast().Value() == "1"); - bool prepareOnly = SessionCtx->Query().PrepareOnly; - auto future = prepareOnly ? CreateDummySuccess() : ( - Gateway->DropTopic(cluster, topicName) - ); + auto future = Gateway->DropTopic(cluster, topicName, missingOk); return WrapFuture(future, [](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) { diff --git a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json index 2110c957525..20725891040 100644 --- a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json +++ b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json @@ -171,7 +171,8 @@ {"Index": 2, "Name": "Topic", "Type": "TCoAtom"}, {"Index": 3, "Name": "Consumers", "Type": "TCoTopicConsumerList"}, {"Index": 4, "Name": "TopicSettings", "Type": "TCoNameValueTupleList"}, - {"Index": 5, "Name": "Settings", "Type": "TCoNameValueTupleList"} + {"Index": 5, "Name": "Settings", "Type": "TCoNameValueTupleList"}, + {"Index": 6, "Name": "ExistingOk", "Type": "TCoAtom"} ] }, { @@ -186,7 +187,9 @@ {"Index": 4, "Name": "AlterConsumers", "Type": "TCoTopicConsumerList"}, {"Index": 5, "Name": "DropConsumers", "Type": "TCoAtomList"}, {"Index": 6, "Name": "TopicSettings", "Type": "TCoNameValueTupleList"}, - {"Index": 7, "Name": "Settings", "Type": "TCoNameValueTupleList"} + {"Index": 7, "Name": "Settings", "Type": "TCoNameValueTupleList"}, + {"Index": 8, "Name": "MissingOk", "Type": "TCoAtom"} + ] }, { @@ -197,7 +200,9 @@ {"Index": 0, "Name": "World", "Type": "TExprBase"}, {"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"}, {"Index": 2, "Name": "Topic", "Type": "TCoAtom"}, - {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"} + {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"}, + {"Index": 4, "Name": "MissingOk", "Type": "TCoAtom"} + ] }, { diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 6e0a6a9f60a..33991c8f026 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -14,6 +14,7 @@ #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <ydb/services/metadata/abstract/kqp_common.h> #include <ydb/services/metadata/manager/abstract.h> +#include <ydb/services/persqueue_v1/actors/events.h> #include <ydb/core/external_sources/external_source_factory.h> #include <ydb/core/kqp/query_data/kqp_query_data.h> @@ -708,6 +709,13 @@ struct TCreateExternalTableSettings { TVector<std::pair<TString, TString>> SourceTypeParameters; }; +struct TAlterTopicSettings { + Ydb::Topic::AlterTopicRequest Request; + TString Name; + TString WorkDir; + bool MissingOk; +}; + struct TSequenceSettings { TMaybe<i64> MinValue; TMaybe<i64> MaxValue; @@ -986,11 +994,13 @@ public: virtual NThreading::TFuture<TGenericResult> DropTable(const TString& cluster, const TDropTableSettings& settings) = 0; - virtual NThreading::TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request) = 0; + virtual NThreading::TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request, bool existingOk) = 0; + + virtual NThreading::TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request, bool missingOk) = 0; - virtual NThreading::TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request) = 0; + virtual NThreading::TFuture<NKikimr::NGRpcProxy::V1::TAlterTopicResponse> AlterTopicPrepared(TAlterTopicSettings&& settings) = 0; - virtual NThreading::TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic) = 0; + virtual NThreading::TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic, bool missingOk) = 0; virtual NThreading::TFuture<TGenericResult> CreateReplication(const TString& cluster, const TCreateReplicationSettings& settings) = 0; diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 9a990514392..1c10f842f18 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -3118,7 +3118,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { // Shuffled auto client = kikimr.GetQueryClient(); - { + { auto prepareResult = client.ExecuteQuery(R"( REPLACE INTO `/Root/ColumnShard` (Col3, Col4, Col2, Col1) VALUES ("test100", "100", 1000, 100u); @@ -3167,7 +3167,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); auto client = kikimr.GetQueryClient(); - + { auto it = client.ExecuteQuery(R"( REPLACE INTO `/Root/DataShard` (Col1, Col2) VALUES (0u, 0); @@ -3185,7 +3185,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { CompareYson(output, R"([[0u;[0];#];[1u;#;["test"]]])"); } - { + { auto it = client.ExecuteQuery(R"( REPLACE INTO `/Root/DataShard` (Col1, Col3) VALUES (0u, 'null'); REPLACE INTO `/Root/DataShard` (Col1) VALUES (1u); @@ -3547,7 +3547,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); auto client = kikimr.GetQueryClient(); - { + { auto prepareResult = client.ExecuteQuery(R"( REPLACE INTO `/Root/ColumnShard` (Col1, Col2) VALUES (100u, 1000), (100u, 1000); @@ -3555,7 +3555,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); } - { + { auto prepareResult = client.ExecuteQuery(R"( REPLACE INTO `/Root/ColumnShard` (Col1, Col2) VALUES (100u, 1000), (100u, 1000); @@ -3598,7 +3598,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); auto client = kikimr.GetQueryClient(); - { + { auto prepareResult = client.ExecuteQuery(R"( REPLACE INTO `/Root/ColumnShard` (Col1, Col2) VALUES (1u, 1) )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); @@ -4020,6 +4020,256 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { ); } } + + + void RunQuery (const TString& query, auto& session, bool expectOk = true) { + auto qResult = session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + if (!qResult.IsSuccess()) { + Cerr << "Query failed, status: " << qResult.GetStatus() << ": " << qResult.GetIssues().ToString() << Endl; + } + UNIT_ASSERT(qResult.IsSuccess() == expectOk); + }; + + struct TEntryCheck { + NYdb::NScheme::ESchemeEntryType Type; + TString Name; + bool IsExpected; + bool WasFound = false; + }; + + TEntryCheck ExpectedTopic(const TString& name) { + return TEntryCheck{NYdb::NScheme::ESchemeEntryType::Topic, name, true}; + } + TEntryCheck UnexpectedTopic(const TString& name) { + return TEntryCheck{NYdb::NScheme::ESchemeEntryType::Topic, name, false}; + } + + void CheckDirEntry(TKikimrRunner& kikimr, TVector<TEntryCheck>& entriesToCheck) { + auto res = kikimr.GetSchemeClient().ListDirectory("/Root").GetValueSync(); + for (const auto& entry : res.GetChildren()) { + Cerr << "Scheme entry: " << entry << Endl; + for (auto& checkEntry : entriesToCheck) { + if (checkEntry.Name != entry.Name) + continue; + if (checkEntry.IsExpected) { + UNIT_ASSERT_C(entry.Type == checkEntry.Type, checkEntry.Name); + checkEntry.WasFound = true; + } else { + UNIT_ASSERT_C(entry.Type != checkEntry.Type, checkEntry.Name); + } + } + } + for (auto& checkEntry : entriesToCheck) { + if (checkEntry.IsExpected) { + UNIT_ASSERT_C(checkEntry.WasFound, checkEntry.Name); + } + } + } + + Y_UNIT_TEST(CreateAndDropTopic) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + serverSettings.PQConfig.SetRequireCredentialsInNewProtocol(false); + TKikimrRunner kikimr( + serverSettings.SetWithSampleTables(false).SetEnableTempTables(true)); + auto client = kikimr.GetQueryClient(); + auto session = client.GetSession().GetValueSync().GetSession(); + auto pq = NYdb::NTopic::TTopicClient(kikimr.GetDriver(), + NYdb::NTopic::TTopicClientSettings().Database("/Root").AuthToken("root@builtin")); + + { + const auto queryCreateTopic = Q_(R"( + --!syntax_v1 + CREATE TOPIC `/Root/TempTopic` (CONSUMER cons1); + )"); + RunQuery(queryCreateTopic, session); + Cerr << "Topic created\n"; + auto desc = pq.DescribeTopic("/Root/TempTopic").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetConsumers().size(), 1); + } + { + const auto queryCreateTopic = Q_(R"( + --!syntax_v1 + CREATE TOPIC IF NOT EXISTS `/Root/TempTopic` (CONSUMER cons1, CONSUMER cons2); + )"); + RunQuery(queryCreateTopic, session); + auto desc = pq.DescribeTopic("/Root/TempTopic").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetConsumers().size(), 1); + } + { + const auto queryCreateTopic = Q_(R"( + --!syntax_v1 + CREATE TOPIC `/Root/TempTopic` (CONSUMER cons1, CONSUMER cons2, CONSUMER cons3); + )"); + RunQuery(queryCreateTopic, session, false); + auto desc = pq.DescribeTopic("/Root/TempTopic").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetConsumers().size(), 1); + } + + TVector<TEntryCheck> entriesToCheck = {ExpectedTopic("TempTopic")}; + CheckDirEntry(kikimr, entriesToCheck); + { + const auto query = Q_(R"( + --!syntax_v1 + Drop TOPIC `/Root/TempTopic`; + )"); + RunQuery(query, session); + Cerr << "Topic dropped\n"; + TVector<TEntryCheck> entriesToCheck = {UnexpectedTopic("TempTopic")}; + CheckDirEntry(kikimr, entriesToCheck); + } + { + const auto query = Q_(R"( + --!syntax_v1 + Drop TOPIC IF EXISTS `/Root/TempTopic`; + )"); + RunQuery(query, session); + } + { + const auto query = Q_(R"( + --!syntax_v1 + Drop TOPIC `/Root/TempTopic`; + )"); + RunQuery(query, session, false); + } + } + + Y_UNIT_TEST(CreateAndAlterTopic) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + TKikimrRunner kikimr{serverSettings}; + auto client = kikimr.GetQueryClient(NYdb::NQuery::TClientSettings{}.AuthToken("root@builtin")); + auto session = client.GetSession().GetValueSync().GetSession(); + auto pq = NYdb::NTopic::TTopicClient(kikimr.GetDriver(), + NYdb::NTopic::TTopicClientSettings().Database("/Root").AuthToken("root@builtin")); + + { + const auto queryCreateTopic = Q_(R"( + --!syntax_v1 + CREATE TOPIC `/Root/TempTopic` (CONSUMER cons1); + )"); + RunQuery(queryCreateTopic, session); + + auto desc = pq.DescribeTopic("/Root/TempTopic").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 1); + } + { + const auto query = Q_(R"( + --!syntax_v1 + ALTER TOPIC `/Root/TempTopic` SET (min_active_partitions = 10); + )"); + RunQuery(query, session); + auto desc = pq.DescribeTopic("/Root/TempTopic").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 10); + } + { + const auto query = Q_(R"( + --!syntax_v1 + ALTER TOPIC IF EXISTS `/Root/TempTopic` SET (min_active_partitions = 15); + )"); + RunQuery(query, session); + auto desc = pq.DescribeTopic("/Root/TempTopic").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 15); + } + + { + const auto query = Q_(R"( + --!syntax_v1 + ALTER TOPIC `/Root/NoSuchTopic` SET (min_active_partitions = 10); + )"); + RunQuery(query, session, false); + + TVector<TEntryCheck> entriesToCheck = {UnexpectedTopic("NoSuchTopic")}; + CheckDirEntry(kikimr, entriesToCheck); + } + { + const auto query = Q_(R"( + --!syntax_v1 + ALTER TOPIC IF EXISTS `/Root/NoSuchTopic` SET (min_active_partitions = 10); + )"); + RunQuery(query, session); + TVector<TEntryCheck> entriesToCheck = {UnexpectedTopic("NoSuchTopic")}; + CheckDirEntry(kikimr, entriesToCheck); + } + } + Y_UNIT_TEST(CreateOrDropTopicOverTable) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + TKikimrRunner kikimr{serverSettings}; + auto tableClient = kikimr.GetTableClient(); + + { + auto tcSession = tableClient.CreateSession().GetValueSync().GetSession(); + UNIT_ASSERT(tcSession.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/TmpTable` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )").GetValueSync().IsSuccess()); + tcSession.Close(); + } + + auto client = kikimr.GetQueryClient(NYdb::NQuery::TClientSettings{}.AuthToken("root@builtin")); + auto session = client.GetSession().GetValueSync().GetSession(); + + TVector<TEntryCheck> entriesToCheck = {TEntryCheck{.Type = NYdb::NScheme::ESchemeEntryType::Table, + .Name = "TmpTable", .IsExpected = true}}; + { + const auto queryCreateTopic = Q_(R"( + --!syntax_v1 + CREATE TOPIC `/Root/TmpTable` (CONSUMER cons1); + )"); + RunQuery(queryCreateTopic, session, false); + CheckDirEntry(kikimr, entriesToCheck); + + } + { + const auto queryCreateTopic = Q_(R"( + --!syntax_v1 + CREATE TOPIC IF NOT EXISTS `/Root/TmpTable` (CONSUMER cons1); + )"); + RunQuery(queryCreateTopic, session, false); + CheckDirEntry(kikimr, entriesToCheck); + } + { + const auto queryDropTopic = Q_(R"( + --!syntax_v1 + DROP TOPIC `/Root/TmpTable`; + )"); + RunQuery(queryDropTopic, session, false); + } + { + const auto queryDropTopic = Q_(R"( + --!syntax_v1 + DROP TOPIC IF EXISTS `/Root/TmpTable`; + )"); + RunQuery(queryDropTopic, session, false); + CheckDirEntry(kikimr, entriesToCheck); + } + { + auto tcSession = tableClient.CreateSession().GetValueSync().GetSession(); + auto type = TTypeBuilder().BeginOptional().Primitive(EPrimitiveType::Uint64).EndOptional().Build(); + auto alter = NYdb::NTable::TAlterTableSettings().AppendAddColumns(TColumn("NewColumn", type)); + + auto alterResult = tcSession.AlterTable("/Root/TmpTable", alter + ).GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(alterResult.GetStatus(), EStatus::SUCCESS); + } + } } } // namespace NKqp diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index d2bbb91c5cc..70d0391e283 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -457,6 +457,9 @@ message TKqpSchemeOperation { NKikimrSchemeOp.TModifyScheme AlterResourcePool = 39; NKikimrSchemeOp.TModifyScheme DropResourcePool = 40; TKqpAnalyzeOperation AnalyzeTable = 41; + NKikimrSchemeOp.TModifyScheme CreateTopic = 42; + NKikimrSchemeOp.TModifyScheme AlterTopic = 43; + NKikimrSchemeOp.TModifyScheme DropTopic = 44; } } diff --git a/ydb/core/tablet/tablet_counters_aggregator.cpp b/ydb/core/tablet/tablet_counters_aggregator.cpp index e66563d4872..28e7497d252 100644 --- a/ydb/core/tablet/tablet_counters_aggregator.cpp +++ b/ydb/core/tablet/tablet_counters_aggregator.cpp @@ -159,7 +159,7 @@ public: } for (ui32 i = 0, e = labeledCounters->GetCounters().Size(); i < e; ++i) { - if(!strlen(labeledCounters->GetCounterName(i))) + if(!strlen(labeledCounters->GetCounterName(i))) continue; const ui64& value = labeledCounters->GetCounters()[i].Get(); const ui64& id = labeledCounters->GetIds()[i].Get(); @@ -1501,7 +1501,8 @@ TTabletCountersAggregatorActor::HandleWork(TEvTabletCounters::TEvTabletLabeledCo continue; } if (groupNames[j] == "Client") { - group = group->GetSubgroup("ConsumerPath", NPersQueue::ConvertOldConsumerName(groups[j], ctx)); + group = group->GetSubgroup("ConsumerPath", + NPersQueue::ConvertOldConsumerName(groups[j], AppData(ctx)->PQConfig)); continue; } } @@ -2095,7 +2096,7 @@ public: if (groups.size() == 1) { //topic case ff = groups[0]; } else if (groups.size() == 3) { //client important topic - res = NPersQueue::ConvertOldConsumerName(groups[0], ctx) + "|" + groups[1] + "|"; + res = NPersQueue::ConvertOldConsumerName(groups[0], AppData(ctx)->PQConfig) + "|" + groups[1] + "|"; ff = groups[2]; } else { continue; diff --git a/ydb/library/persqueue/topic_parser/topic_parser.cpp b/ydb/library/persqueue/topic_parser/topic_parser.cpp index dec46a7d803..c71c9ac6084 100644 --- a/ydb/library/persqueue/topic_parser/topic_parser.cpp +++ b/ydb/library/persqueue/topic_parser/topic_parser.cpp @@ -75,22 +75,31 @@ TString GetFullTopicPath(const NActors::TActorContext& ctx, const TMaybe<TString } } -TString ConvertNewConsumerName(const TString& consumer, const NActors::TActorContext& ctx) { - if (NKikimr::AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { +TString ConvertNewConsumerName(const TString& consumer, const NKikimrPQ::TPQConfig& pqConfig) { + if (pqConfig.GetTopicsAreFirstClassCitizen()) { return consumer; } else { return ConvertNewConsumerName(consumer); } } -TString ConvertOldConsumerName(const TString& consumer, const NActors::TActorContext& ctx) { - if (NKikimr::AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { +TString ConvertNewConsumerName(const TString& consumer, const NActors::TActorContext& ctx) { + return ConvertNewConsumerName(consumer, NKikimr::AppData(ctx)->PQConfig); +} + +TString ConvertOldConsumerName(const TString& consumer, const NKikimrPQ::TPQConfig& pqConfig) { + if (pqConfig.GetTopicsAreFirstClassCitizen()) { return consumer; } else { return ConvertOldConsumerName(consumer); } } +TString ConvertOldConsumerName(const TString& consumer, const NActors::TActorContext& ctx) { + return ConvertOldConsumerName(consumer, NKikimr::AppData(ctx)->PQConfig); +} + + TString MakeConsumerPath(const TString& consumer) { TStringBuilder res; res.reserve(consumer.size()); diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h index 95d48657293..9ca5809460b 100644 --- a/ydb/library/persqueue/topic_parser/topic_parser.h +++ b/ydb/library/persqueue/topic_parser/topic_parser.h @@ -18,7 +18,9 @@ namespace NKikimr::NMsgBusProxy::NPqMetaCacheV2 { namespace NPersQueue { TString GetFullTopicPath(const NActors::TActorContext& ctx, const TMaybe<TString>& database, const TString& topicPath); +TString ConvertNewConsumerName(const TString& consumer, const NKikimrPQ::TPQConfig& pqConfig); TString ConvertNewConsumerName(const TString& consumer, const NActors::TActorContext& ctx); +TString ConvertOldConsumerName(const TString& consumer, const NKikimrPQ::TPQConfig& pqConfig); TString ConvertOldConsumerName(const TString& consumer, const NActors::TActorContext& ctx); TString MakeConsumerPath(const TString& consumer); diff --git a/ydb/library/yql/sql/v1/SQLv1.g.in b/ydb/library/yql/sql/v1/SQLv1.g.in index 838d5608013..3af06e1158c 100644 --- a/ydb/library/yql/sql/v1/SQLv1.g.in +++ b/ydb/library/yql/sql/v1/SQLv1.g.in @@ -915,7 +915,7 @@ multiple_column_assignment: set_target_list EQUALS LPAREN simple_values_source R set_target_list: LPAREN set_target (COMMA set_target)* RPAREN; // topics -create_topic_stmt: CREATE TOPIC topic_ref create_topic_entries? with_topic_settings?; +create_topic_stmt: CREATE TOPIC (IF NOT EXISTS)? topic_ref create_topic_entries? with_topic_settings?; create_topic_entries: LPAREN create_topic_entry (COMMA create_topic_entry)* RPAREN; create_topic_entry: @@ -923,7 +923,7 @@ create_topic_entry: ; with_topic_settings: WITH LPAREN topic_settings RPAREN; -alter_topic_stmt: ALTER TOPIC topic_ref alter_topic_action (COMMA alter_topic_action)*; +alter_topic_stmt: ALTER TOPIC (IF EXISTS)? topic_ref alter_topic_action (COMMA alter_topic_action)*; alter_topic_action: alter_topic_add_consumer | alter_topic_alter_consumer @@ -949,7 +949,7 @@ topic_alter_consumer_reset: RESET LPAREN an_id (COMMA an_id)* RPAREN; alter_topic_set_settings: SET LPAREN topic_settings RPAREN; alter_topic_reset_settings: RESET LPAREN an_id (COMMA an_id_pure)* RPAREN; -drop_topic_stmt: DROP TOPIC topic_ref; +drop_topic_stmt: DROP TOPIC (IF EXISTS)? topic_ref; topic_settings: topic_settings_entry (COMMA topic_settings_entry)*; topic_settings_entry: an_id EQUALS topic_setting_value; diff --git a/ydb/library/yql/sql/v1/format/sql_format.cpp b/ydb/library/yql/sql/v1/format/sql_format.cpp index 8ebff4d1df8..88a35bd7bd9 100644 --- a/ydb/library/yql/sql/v1/format/sql_format.cpp +++ b/ydb/library/yql/sql/v1/format/sql_format.cpp @@ -1385,10 +1385,11 @@ private: NewLine(); VisitKeyword(msg.GetToken1()); VisitKeyword(msg.GetToken2()); - Visit(msg.GetRule_topic_ref3()); - if (msg.HasBlock4()) { + Visit(msg.GetBlock3()); + Visit(msg.GetRule_topic_ref4()); + if (msg.HasBlock5()) { PushCurrentIndent(); - auto& b = msg.GetBlock4().GetRule_create_topic_entries1(); + auto& b = msg.GetBlock5().GetRule_create_topic_entries1(); Visit(b.GetToken1()); NewLine(); Visit(b.GetRule_create_topic_entry2()); @@ -1401,8 +1402,8 @@ private: PopCurrentIndent(); Visit(b.GetToken4()); } - if (msg.HasBlock5()) { - auto& b = msg.GetBlock5().GetRule_with_topic_settings1(); + if (msg.HasBlock6()) { + auto& b = msg.GetBlock6().GetRule_with_topic_settings1(); VisitKeyword(b.GetToken1()); VisitKeyword(b.GetToken2()); PushCurrentIndent(); @@ -1419,11 +1420,12 @@ private: NewLine(); VisitKeyword(msg.GetToken1()); VisitKeyword(msg.GetToken2()); - Visit(msg.GetRule_topic_ref3()); + Visit(msg.GetBlock3()); + Visit(msg.GetRule_topic_ref4()); NewLine(); PushCurrentIndent(); - Visit(msg.GetRule_alter_topic_action4()); - for (auto& b : msg.GetBlock5()) { + Visit(msg.GetRule_alter_topic_action5()); + for (auto& b : msg.GetBlock6()) { Visit(b.GetToken1()); NewLine(); Visit(b.GetRule_alter_topic_action2()); diff --git a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp index 3e386b5d2dd..fedbf0dd625 100644 --- a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp +++ b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp @@ -284,7 +284,7 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) { {"create table user(index user local on (user) cover (user,user))", "CREATE TABLE user (\n\tINDEX user LOCAL ON (user) COVER (user, user)\n);\n"}, {"create table user(index idx global using subtype on (col) cover (col) with (setting = foo, another_setting = bar));", - "CREATE TABLE user (\n\tINDEX idx GLOBAL USING subtype ON (col) COVER (col) WITH (setting = foo, another_setting = bar)\n);\n"}, + "CREATE TABLE user (\n\tINDEX idx GLOBAL USING subtype ON (col) COVER (col) WITH (setting = foo, another_setting = bar)\n);\n"}, {"create table user(family user (user='foo'))", "CREATE TABLE user (\n\tFAMILY user (user = 'foo')\n);\n"}, {"create table user(family user (user='foo',user='bar'))", @@ -465,7 +465,7 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) { {"alter table user alter index idx reset (setting, another_setting)", "ALTER TABLE user\n\tALTER INDEX idx RESET (setting, another_setting);\n"}, {"alter table user add index idx global using subtype on (col) cover (col) with (setting = foo, another_setting = 'bar');", - "ALTER TABLE user\n\tADD INDEX idx GLOBAL USING subtype ON (col) COVER (col) WITH (setting = foo, another_setting = 'bar');\n"}, + "ALTER TABLE user\n\tADD INDEX idx GLOBAL USING subtype ON (col) COVER (col) WITH (setting = foo, another_setting = 'bar');\n"}, {"alter table user drop index user", "ALTER TABLE user\n\tDROP INDEX user;\n"}, {"alter table user rename to user", @@ -513,6 +513,7 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) { TSetup setup; setup.Run(cases); } + Y_UNIT_TEST(AlterTopic) { TCases cases = { {"alter topic topic1 alter consumer c1 set (important = false)", @@ -529,6 +530,7 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) { TSetup setup; setup.Run(cases); } + Y_UNIT_TEST(DropTopic) { TCases cases = { {"drop topic topic1", @@ -539,6 +541,20 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) { setup.Run(cases); } + Y_UNIT_TEST(TopicExistsStatement) { + TCases cases = { + {"drop topic if exists topic1", + "DROP TOPIC IF EXISTS topic1;\n"}, + {"create topic if not exists topic1 with (partition_count_limit = 5)", + "CREATE TOPIC IF NOT EXISTS topic1 WITH (\n\tpartition_count_limit = 5\n);\n"}, + {"alter topic if exists topic1 alter consumer c1 set (important = false)", + "ALTER TOPIC IF EXISTS topic1\n\tALTER CONSUMER c1 SET (important = FALSE);\n"}, + }; + + TSetup setup; + setup.Run(cases); + } + Y_UNIT_TEST(Do) { TCases cases = { {"do $a(1,2,3)", diff --git a/ydb/library/yql/sql/v1/node.h b/ydb/library/yql/sql/v1/node.h index 0e9ca3f7b90..b5c3272f53d 100644 --- a/ydb/library/yql/sql/v1/node.h +++ b/ydb/library/yql/sql/v1/node.h @@ -1334,6 +1334,7 @@ namespace NSQLTranslationV1 { struct TCreateTopicParameters { TVector<TTopicConsumerDescription> Consumers; TTopicSettings TopicSettings; + bool ExistingOk; }; struct TAlterTopicParameters { @@ -1341,6 +1342,11 @@ namespace NSQLTranslationV1 { THashMap<TString, TTopicConsumerDescription> AlterConsumers; TVector<TIdentifier> DropConsumers; TTopicSettings TopicSettings; + bool MissingOk; + }; + + struct TDropTopicParameters { + bool MissingOk; }; TString IdContent(TContext& ctx, const TString& str); @@ -1475,7 +1481,8 @@ namespace NSQLTranslationV1 { TScopedStatePtr scoped); TNodePtr BuildAlterTopic(TPosition pos, const TTopicRef& tr, const TAlterTopicParameters& params, TScopedStatePtr scoped); - TNodePtr BuildDropTopic(TPosition pos, const TTopicRef& topic, TScopedStatePtr scoped); + TNodePtr BuildDropTopic(TPosition pos, const TTopicRef& topic, const TDropTopicParameters& params, + TScopedStatePtr scoped); template<class TContainer> TMaybe<TString> FindMistypeIn(const TContainer& container, const TString& name) { diff --git a/ydb/library/yql/sql/v1/query.cpp b/ydb/library/yql/sql/v1/query.cpp index cdd7ba0ef74..00e42bacde0 100644 --- a/ydb/library/yql/sql/v1/query.cpp +++ b/ydb/library/yql/sql/v1/query.cpp @@ -1630,7 +1630,8 @@ public: } auto opts = Y(); - opts = L(opts, Q(Y(Q("mode"), Q("create")))); + TString mode = Params.ExistingOk ? "create_if_not_exists" : "create"; + opts = L(opts, Q(Y(Q("mode"), Q(mode)))); for (const auto& consumer : Params.Consumers) { const auto& desc = CreateConsumerDesc(consumer, *this, false); @@ -1741,7 +1742,8 @@ public: } auto opts = Y(); - opts = L(opts, Q(Y(Q("mode"), Q("alter")))); + TString mode = Params.MissingOk ? "alter_if_exists" : "alter"; + opts = L(opts, Q(Y(Q("mode"), Q(mode)))); for (const auto& consumer : Params.AddConsumers) { const auto& desc = CreateConsumerDesc(consumer, *this, false); @@ -1815,9 +1817,10 @@ TNodePtr BuildAlterTopic( class TDropTopicNode final: public TAstListNode { public: - TDropTopicNode(TPosition pos, const TTopicRef& tr, TScopedStatePtr scoped) + TDropTopicNode(TPosition pos, const TTopicRef& tr, const TDropTopicParameters& params, TScopedStatePtr scoped) : TAstListNode(pos) , Topic(tr) + , Params(params) , Scoped(scoped) { scoped->UseCluster(TString(KikimrProviderName), Topic.Cluster); @@ -1832,7 +1835,8 @@ public: auto opts = Y(); - opts = L(opts, Q(Y(Q("mode"), Q("drop")))); + TString mode = Params.MissingOk ? "drop_if_exists" : "drop"; + opts = L(opts, Q(Y(Q("mode"), Q(mode)))); Add("block", Q(Y( @@ -1850,12 +1854,13 @@ public: } private: TTopicRef Topic; + TDropTopicParameters Params; TScopedStatePtr Scoped; TSourcePtr FakeSource; }; -TNodePtr BuildDropTopic(TPosition pos, const TTopicRef& tr, TScopedStatePtr scoped) { - return new TDropTopicNode(pos, tr, scoped); +TNodePtr BuildDropTopic(TPosition pos, const TTopicRef& tr, const TDropTopicParameters& params, TScopedStatePtr scoped) { + return new TDropTopicNode(pos, tr, params, scoped); } class TCreateRole final: public TAstListNode { diff --git a/ydb/library/yql/sql/v1/sql_query.cpp b/ydb/library/yql/sql/v1/sql_query.cpp index d4e2c1bed86..066672b7c5c 100644 --- a/ydb/library/yql/sql/v1/sql_query.cpp +++ b/ydb/library/yql/sql/v1/sql_query.cpp @@ -1002,16 +1002,21 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& } case TRule_sql_stmt_core::kAltSqlStmtCore35: { Ctx.BodyPart(); - // create_topic_stmt: CREATE TOPIC topic1 (CONSUMER ...)? [WITH (opt1 = val1, ...]? + // create_topic_stmt: CREATE TOPIC (IF NOT EXISTS)? topic1 (CONSUMER ...)? [WITH (opt1 = val1, ...]? auto& rule = core.GetAlt_sql_stmt_core35().GetRule_create_topic_stmt1(); TTopicRef tr; - if (!TopicRefImpl(rule.GetRule_topic_ref3(), tr)) { + if (!TopicRefImpl(rule.GetRule_topic_ref4(), tr)) { return false; } + bool existingOk = false; + if (rule.HasBlock3()) { // if not exists + existingOk = true; + } TCreateTopicParameters params; - if (rule.HasBlock4()) { //create_topic_entry (consumers) - auto& entries = rule.GetBlock4().GetRule_create_topic_entries1(); + params.ExistingOk = existingOk; + if (rule.HasBlock5()) { //create_topic_entry (consumers) + auto& entries = rule.GetBlock5().GetRule_create_topic_entries1(); auto& firstEntry = entries.GetRule_create_topic_entry2(); if (!CreateTopicEntry(firstEntry, params)) { return false; @@ -1024,8 +1029,8 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& } } - if (rule.HasBlock5()) { // with_topic_settings - auto& topic_settings_node = rule.GetBlock5().GetRule_with_topic_settings1().GetRule_topic_settings3(); + if (rule.HasBlock6()) { // with_topic_settings + auto& topic_settings_node = rule.GetBlock6().GetRule_with_topic_settings1().GetRule_topic_settings3(); CreateTopicSettings(topic_settings_node, params.TopicSettings); } @@ -1034,20 +1039,26 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& } case TRule_sql_stmt_core::kAltSqlStmtCore36: { // alter_topic_stmt: ALTER TOPIC topic_ref alter_topic_action (COMMA alter_topic_action)*; +// alter_topic_stmt: ALTER TOPIC IF EXISTS topic_ref alter_topic_action (COMMA alter_topic_action)*; Ctx.BodyPart(); auto& rule = core.GetAlt_sql_stmt_core36().GetRule_alter_topic_stmt1(); TTopicRef tr; - if (!TopicRefImpl(rule.GetRule_topic_ref3(), tr)) { + bool missingOk = false; + if (rule.HasBlock3()) { // IF EXISTS + missingOk = true; + } + if (!TopicRefImpl(rule.GetRule_topic_ref4(), tr)) { return false; } TAlterTopicParameters params; - auto& firstEntry = rule.GetRule_alter_topic_action4(); + params.MissingOk = missingOk; + auto& firstEntry = rule.GetRule_alter_topic_action5(); if (!AlterTopicAction(firstEntry, params)) { return false; } - const auto& list = rule.GetBlock5(); + const auto& list = rule.GetBlock6(); for (auto& node : list) { if (!AlterTopicAction(node.GetRule_alter_topic_action2(), params)) { return false; @@ -1058,15 +1069,22 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& break; } case TRule_sql_stmt_core::kAltSqlStmtCore37: { - // drop_topic_stmt: DROP TOPIC + // drop_topic_stmt: DROP TOPIC (IF EXISTS)? topic_ref; Ctx.BodyPart(); const auto& rule = core.GetAlt_sql_stmt_core37().GetRule_drop_topic_stmt1(); + TDropTopicParameters params; + if (rule.HasBlock3()) { // IF EXISTS + params.MissingOk = true; + } else { + params.MissingOk = false; + } + TTopicRef tr; - if (!TopicRefImpl(rule.GetRule_topic_ref3(), tr)) { + if (!TopicRefImpl(rule.GetRule_topic_ref4(), tr)) { return false; } - AddStatementToBlocks(blocks, BuildDropTopic(Ctx.Pos(), tr, Ctx.Scoped)); + AddStatementToBlocks(blocks, BuildDropTopic(Ctx.Pos(), tr, params, Ctx.Scoped)); break; } case TRule_sql_stmt_core::kAltSqlStmtCore38: { diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 69e1a813b8a..e1fd2934f7e 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -170,20 +170,20 @@ namespace NKikimr::NDataStreams::V1 { break; default: return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), - "streams can't be created with unknown metering mode", ctx); + "streams can't be created with unknown metering mode"); } } } else { if (GetProtoRequest()->has_stream_mode_details()) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), - "streams can't be created with metering mode", ctx); + "streams can't be created with metering mode"); } } if (GetProtoRequest()->has_partitioning_settings()) { auto r = ValidatePartitioningSettings(GetProtoRequest()->partitioning_settings()); if (!r.empty()) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), r, ctx); + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), r); } auto& s = GetProtoRequest()->partitioning_settings(); @@ -214,10 +214,10 @@ namespace NKikimr::NDataStreams::V1 { pqDescr->SetPartitionPerTablet(1); TString error; - TYdbPqCodes codes = NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(name, topicRequest, modifyScheme, ctx, error, + TYdbPqCodes codes = NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(name, topicRequest, modifyScheme, AppData(ctx), error, workingDir, proposal.Record.GetDatabaseName()); if (codes.YdbCode != Ydb::StatusIds::SUCCESS) { - return ReplyWithError(codes.YdbCode, codes.PQCode, error, ctx); + return ReplyWithError(codes.YdbCode, codes.PQCode, error); } } @@ -229,8 +229,7 @@ namespace NKikimr::NDataStreams::V1 { { return ReplyWithError(Ydb::StatusIds::ALREADY_EXISTS, static_cast<size_t>(NYds::EErrorCodes::IN_USE), - TStringBuilder() << "Stream with name " << GetProtoRequest()->stream_name() << " already exists", - ctx); + TStringBuilder() << "Stream with name " << GetProtoRequest()->stream_name() << " already exists"); } return TBase::TBase::Handle(ev, ctx); } @@ -297,7 +296,7 @@ namespace NKikimr::NDataStreams::V1 { if (NPQ::ConsumerCount(pqGroupDescription.GetPQTabletConfig()) > 0 && EnforceDeletion == false) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::IN_USE), TStringBuilder() << "Stream has registered consumers" << - "and EnforceConsumerDeletion flag is false", ActorContext()); + "and EnforceConsumerDeletion flag is false"); } SendProposeRequest(ActorContext()); @@ -314,10 +313,10 @@ namespace NKikimr::NDataStreams::V1 { } void Bootstrap(const TActorContext& ctx); - void ModifyPersqueueConfig(const TActorContext& ctx, + void ModifyPersqueueConfig(TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, - const NKikimrSchemeOp::TDirEntry& selfInfo); + const NKikimrSchemeOp::TDirEntry& selfInfo) override; }; void TUpdateShardCountActor::Bootstrap(const TActorContext& ctx) { @@ -327,16 +326,17 @@ namespace NKikimr::NDataStreams::V1 { } void TUpdateShardCountActor::ModifyPersqueueConfig( - const TActorContext& ctx, + TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, const NKikimrSchemeOp::TDirEntry& selfInfo ) { Y_UNUSED(selfInfo); + Y_UNUSED(appData); TString error; if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), error, ctx); + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), error); } groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count()); @@ -354,10 +354,10 @@ namespace NKikimr::NDataStreams::V1 { } void Bootstrap(const TActorContext& ctx); - void ModifyPersqueueConfig(const TActorContext& ctx, + void ModifyPersqueueConfig(TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, - const NKikimrSchemeOp::TDirEntry& selfInfo); + const NKikimrSchemeOp::TDirEntry& selfInfo) override; }; void TUpdateStreamModeActor::Bootstrap(const TActorContext& ctx) { @@ -367,16 +367,16 @@ namespace NKikimr::NDataStreams::V1 { } void TUpdateStreamModeActor::ModifyPersqueueConfig( - const TActorContext& ctx, + TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, const NKikimrSchemeOp::TDirEntry& selfInfo ) { Y_UNUSED(selfInfo); Y_UNUSED(pqGroupDescription); - if (!AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) { + if (!appData->PQConfig.GetBillingMeteringConfig().GetEnabled()) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), - "streams can't be created with metering mode", ctx); + "streams can't be created with metering mode"); } switch(GetProtoRequest()->stream_mode_details().stream_mode()) { @@ -388,7 +388,7 @@ namespace NKikimr::NDataStreams::V1 { break; default: return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), - "streams can't be created with unknown metering mode", ctx); + "streams can't be created with unknown metering mode"); } } @@ -406,7 +406,7 @@ namespace NKikimr::NDataStreams::V1 { } void Bootstrap(const TActorContext& ctx); - void ModifyPersqueueConfig(const TActorContext& ctx, + void ModifyPersqueueConfig(TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, const NKikimrSchemeOp::TDirEntry& selfInfo); @@ -419,7 +419,7 @@ namespace NKikimr::NDataStreams::V1 { } void TUpdateStreamActor::ModifyPersqueueConfig( - const TActorContext& ctx, + TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, const NKikimrSchemeOp::TDirEntry& selfInfo @@ -430,7 +430,7 @@ namespace NKikimr::NDataStreams::V1 { if (!GetProtoRequest()->has_partitioning_settings()) { if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::BAD_REQUEST), error, ctx); + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::BAD_REQUEST), error); } groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count()); @@ -451,15 +451,16 @@ namespace NKikimr::NDataStreams::V1 { default: {} } - auto* pqConfig = groupConfig.MutablePQTabletConfig(); + auto* tabletConfig = groupConfig.MutablePQTabletConfig(); - pqConfig->MutablePartitionConfig()->SetWriteSpeedInBytesPerSecond( + tabletConfig->MutablePartitionConfig()->SetWriteSpeedInBytesPerSecond( PartitionWriteSpeedInBytesPerSec(GetProtoRequest()->write_quota_kb_per_sec())); + const auto& pqConfig = appData->PQConfig; if (GetProtoRequest()->has_stream_mode_details()) { - if (!AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) { + if (!pqConfig.GetBillingMeteringConfig().GetEnabled()) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), - "streams can't be created with metering mode", ctx); + "streams can't be created with metering mode"); } switch(GetProtoRequest()->stream_mode_details().stream_mode()) { @@ -471,14 +472,14 @@ namespace NKikimr::NDataStreams::V1 { break; default: return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), - "streams can't be created with unknown metering mode", ctx); + "streams can't be created with unknown metering mode"); } } if (GetProtoRequest()->has_partitioning_settings()) { auto r = ValidatePartitioningSettings(GetProtoRequest()->partitioning_settings()); if (!r.empty()) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), r, ctx); + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), r); } auto& s = GetProtoRequest()->partitioning_settings(); @@ -519,12 +520,13 @@ namespace NKikimr::NDataStreams::V1 { t->SetScaleDownPartitionWriteSpeedThresholdPercent(ws.down_utilization_percent() ? ws.down_utilization_percent() : 30); } - auto serviceTypes = GetSupportedClientServiceTypes(ctx); - auto status = CheckConfig(*pqConfig, serviceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS); + auto serviceTypes = GetSupportedClientServiceTypes(pqConfig); + auto status = CheckConfig(*tabletConfig, serviceTypes, error, pqConfig, + Ydb::StatusIds::ALREADY_EXISTS); if (status != Ydb::StatusIds::SUCCESS) { return ReplyWithError(status, status == Ydb::StatusIds::ALREADY_EXISTS ? static_cast<size_t>(NYds::EErrorCodes::IN_USE) : static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR), - error, ctx); + error); } } @@ -541,7 +543,7 @@ namespace NKikimr::NDataStreams::V1 { } void Bootstrap(const TActorContext& ctx); - void ModifyPersqueueConfig(const TActorContext& ctx, + void ModifyPersqueueConfig(TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, const NKikimrSchemeOp::TDirEntry& selfInfo); @@ -554,7 +556,7 @@ namespace NKikimr::NDataStreams::V1 { } void TSetWriteQuotaActor::ModifyPersqueueConfig( - const TActorContext& ctx, + TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, const NKikimrSchemeOp::TDirEntry& selfInfo @@ -564,17 +566,18 @@ namespace NKikimr::NDataStreams::V1 { TString error; - auto* pqConfig = groupConfig.MutablePQTabletConfig(); + auto* tabletConfig = groupConfig.MutablePQTabletConfig(); - pqConfig->MutablePartitionConfig()->SetWriteSpeedInBytesPerSecond(GetProtoRequest()->write_quota_kb_per_sec() * 1_KB); - pqConfig->MutablePartitionConfig()->SetBurstSize(GetProtoRequest()->write_quota_kb_per_sec() * 1_KB); + tabletConfig->MutablePartitionConfig()->SetWriteSpeedInBytesPerSecond(GetProtoRequest()->write_quota_kb_per_sec() * 1_KB); + tabletConfig->MutablePartitionConfig()->SetBurstSize(GetProtoRequest()->write_quota_kb_per_sec() * 1_KB); - auto serviceTypes = GetSupportedClientServiceTypes(ctx); - auto status = CheckConfig(*pqConfig, serviceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS); + const auto& pqConfig = appData->PQConfig; + auto serviceTypes = GetSupportedClientServiceTypes(pqConfig); + auto status = CheckConfig(*tabletConfig, serviceTypes, error, pqConfig, Ydb::StatusIds::ALREADY_EXISTS); if (status != Ydb::StatusIds::SUCCESS) { return ReplyWithError(status, status == Ydb::StatusIds::ALREADY_EXISTS? static_cast<size_t>(NYds::EErrorCodes::IN_USE) : static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR), - error, ctx); + error); } } @@ -600,7 +603,7 @@ namespace NKikimr::NDataStreams::V1 { } void ModifyPersqueueConfig( - const TActorContext& ctx, + TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, const NKikimrSchemeOp::TDirEntry& selfInfo @@ -611,9 +614,9 @@ namespace NKikimr::NDataStreams::V1 { TString error; Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS; - auto* pqConfig = groupConfig.MutablePQTabletConfig(); + auto* tabletConfig = groupConfig.MutablePQTabletConfig(); - ui32 currentLifetime = pqConfig->GetPartitionConfig().GetLifetimeSeconds(); + ui32 currentLifetime = tabletConfig->GetPartitionConfig().GetLifetimeSeconds(); ui32 newLifetime = TInstant::Hours(this->GetProtoRequest()->retention_period_hours()).Seconds(); if (ShouldIncrease) { if (newLifetime <= currentLifetime) { @@ -628,16 +631,18 @@ namespace NKikimr::NDataStreams::V1 { status = Ydb::StatusIds::BAD_REQUEST; } } + const auto& pqConfig = appData->PQConfig; if (status == Ydb::StatusIds::SUCCESS) { - pqConfig->MutablePartitionConfig()->SetLifetimeSeconds(newLifetime); + tabletConfig->MutablePartitionConfig()->SetLifetimeSeconds(newLifetime); - auto serviceTypes = GetSupportedClientServiceTypes(ctx); - status = CheckConfig(*pqConfig, serviceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS); + auto serviceTypes = GetSupportedClientServiceTypes(pqConfig); + status = CheckConfig(*tabletConfig, serviceTypes, error, + pqConfig, Ydb::StatusIds::ALREADY_EXISTS); } if (status != Ydb::StatusIds::SUCCESS) { return TBase::ReplyWithError(status, status == Ydb::StatusIds::ALREADY_EXISTS ? static_cast<size_t>(NYds::EErrorCodes::IN_USE) : - static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR), error, ctx); + static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR), error); } } @@ -662,16 +667,16 @@ namespace NKikimr::NDataStreams::V1 { void StateWork(TAutoPtr<IEventHandle>& ev); void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); - void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext&) { if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) { ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED, - TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); + TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId); } } - void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext&) { ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED, - TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); + TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId); } void Handle(TEvPersQueue::TEvOffsetsResponse::TPtr& ev, const TActorContext& ctx) { @@ -866,7 +871,7 @@ namespace NKikimr::NDataStreams::V1 { } if (!startShardFound) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, - TStringBuilder() << "Bad shard id " << GetProtoRequest()->exclusive_start_shard_id(), ctx); + TStringBuilder() << "Bad shard id " << GetProtoRequest()->exclusive_start_shard_id()); } return ReplyWithResult(Ydb::StatusIds::SUCCESS, result, ctx); } @@ -1084,15 +1089,15 @@ namespace NKikimr::NDataStreams::V1 { if (!GetProtoRequest()->next_token().empty() && !GetProtoRequest()->stream_arn().empty()) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_PARAMETER_COMBINATION), - TStringBuilder() << "StreamArn and NextToken can not be provided together", ctx); + TStringBuilder() << "StreamArn and NextToken can not be provided together"); } if (NextToken.IsExpired()) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::EXPIRED_TOKEN), - TStringBuilder() << "Provided NextToken is expired", ctx); + TStringBuilder() << "Provided NextToken is expired"); } if (!NextToken.IsValid()) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), - TStringBuilder() << "Provided NextToken is malformed", ctx); + TStringBuilder() << "Provided NextToken is malformed"); } auto maxResultsInRange = MIN_MAX_RESULTS <= MaxResults && MaxResults <= MAX_MAX_RESULTS; @@ -1100,7 +1105,7 @@ namespace NKikimr::NDataStreams::V1 { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR), TStringBuilder() << "Requested max_result value '" << MaxResults << "' is out of range [" << MIN_MAX_RESULTS << ", " << MAX_MAX_RESULTS << - "]", ctx); + "]"); } SendDescribeProposeRequest(ctx); Become(&TListStreamConsumersActor::StateWork); @@ -1132,7 +1137,7 @@ namespace NKikimr::NDataStreams::V1 { if (alreadyRead > consumerCount) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), TStringBuilder() << "Provided next_token is malformed - " << - "everything is already read", ActorContext()); + "everything is already read"); } const auto rulesToRead = std::min(consumerCount - alreadyRead, MaxResults); @@ -1180,10 +1185,10 @@ namespace NKikimr::NDataStreams::V1 { ~TRegisterStreamConsumerActor() = default; void Bootstrap(const NActors::TActorContext& ctx); - void ModifyPersqueueConfig(const TActorContext& ctx, + void ModifyPersqueueConfig(TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, - const NKikimrSchemeOp::TDirEntry& selfInfo); + const NKikimrSchemeOp::TDirEntry& selfInfo) override; void OnNotifyTxCompletionResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) override; private: @@ -1202,14 +1207,14 @@ namespace NKikimr::NDataStreams::V1 { } void TRegisterStreamConsumerActor::ModifyPersqueueConfig( - const TActorContext& ctx, + TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, const NKikimrSchemeOp::TDirEntry& selfInfo ) { Y_UNUSED(pqGroupDescription); - auto* pqConfig = groupConfig.MutablePQTabletConfig(); + auto* tabletConfig = groupConfig.MutablePQTabletConfig(); Ydb::PersQueue::V1::TopicSettings::ReadRule readRule; readRule.set_consumer_name(ConsumerName); readRule.set_supported_format(Ydb::PersQueue::V1::TopicSettings_Format_FORMAT_BASE); @@ -1220,14 +1225,15 @@ namespace NKikimr::NDataStreams::V1 { if (readRule.version() == 0) { readRule.set_version(selfInfo.GetVersion().GetPQVersion()); } - auto serviceTypes = GetSupportedClientServiceTypes(ctx); - auto messageAndCode = AddReadRuleToConfig(pqConfig, readRule, serviceTypes, ctx); - size_t issueCode = static_cast<size_t>(messageAndCode.PQCode); + const auto& pqConfig = appData->PQConfig; + auto serviceTypes = GetSupportedClientServiceTypes(pqConfig); + auto messageAndCode = AddReadRuleToConfig(tabletConfig, readRule, serviceTypes, pqConfig); + size_t issueCode = static_cast<size_t>(messageAndCode.PQCode); Ydb::StatusIds::StatusCode status; if (messageAndCode.PQCode == Ydb::PersQueue::ErrorCode::OK) { - status = CheckConfig(*pqConfig, serviceTypes, messageAndCode.Message, ctx, Ydb::StatusIds::ALREADY_EXISTS); + status = CheckConfig(*tabletConfig, serviceTypes, messageAndCode.Message, pqConfig, Ydb::StatusIds::ALREADY_EXISTS); if (status == Ydb::StatusIds::ALREADY_EXISTS) { issueCode = static_cast<size_t>(NYds::EErrorCodes::IN_USE); } @@ -1236,7 +1242,7 @@ namespace NKikimr::NDataStreams::V1 { } if (status != Ydb::StatusIds::SUCCESS) { - return ReplyWithError(status, issueCode, messageAndCode.Message, ctx); + return ReplyWithError(status, issueCode, messageAndCode.Message); } } @@ -1265,10 +1271,10 @@ namespace NKikimr::NDataStreams::V1 { ~TDeregisterStreamConsumerActor() = default; void Bootstrap(const NActors::TActorContext& ctx); - void ModifyPersqueueConfig(const TActorContext& ctx, + void ModifyPersqueueConfig(TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, - const NKikimrSchemeOp::TDirEntry& selfInfo); + const NKikimrSchemeOp::TDirEntry& selfInfo) override; private: TString ConsumerName; @@ -1286,7 +1292,7 @@ namespace NKikimr::NDataStreams::V1 { } void TDeregisterStreamConsumerActor::ModifyPersqueueConfig( - const TActorContext& ctx, + TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, const NKikimrSchemeOp::TDirEntry& selfInfo @@ -1296,10 +1302,10 @@ namespace NKikimr::NDataStreams::V1 { groupConfig.MutablePQTabletConfig(), pqGroupDescription.GetPQTabletConfig(), GetProtoRequest()->consumer_name(), - ctx + appData->PQConfig ); if (!error.Empty()) { - return ReplyWithError(Ydb::StatusIds::NOT_FOUND, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND), error, ctx); + return ReplyWithError(Ydb::StatusIds::NOT_FOUND, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND), error); } } @@ -1352,7 +1358,7 @@ namespace NKikimr::NDataStreams::V1 { auto sn = SequenceNumberToInt(GetProtoRequest()->starting_sequence_number()); if (!sn) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), - TStringBuilder() << "Malformed sequence number", ctx); + TStringBuilder() << "Malformed sequence number"); } SequenceNumber = sn.value() + (IteratorType == TIteratorType::AFTER_SEQUENCE_NUMBER ? 1u : 0u); } @@ -1361,13 +1367,13 @@ namespace NKikimr::NDataStreams::V1 { if (GetProtoRequest()->timestamp() == 0) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), TStringBuilder() << "Shard iterator type is AT_TIMESTAMP, " << - "but a timestamp is missed", ctx); + "but a timestamp is missed"); } if (GetProtoRequest()->timestamp() > static_cast<i64>(TInstant::Now().MilliSeconds()) + TIMESTAMP_DELTA_ALLOWED_MS) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), TStringBuilder() << "Shard iterator type is AT_TIMESTAMP, " << - "but a timestamp is in the future", ctx); + "but a timestamp is in the future"); } ReadTimestampMs = GetProtoRequest()->timestamp(); break; @@ -1380,7 +1386,7 @@ namespace NKikimr::NDataStreams::V1 { default: return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), TStringBuilder() << "Shard iterator type '" << - (ui32)IteratorType << "' is not known", ctx); + (ui32)IteratorType << "' is not known"); } @@ -1412,7 +1418,7 @@ namespace NKikimr::NDataStreams::V1 { TStringBuilder() << "Access to stream " << this->GetProtoRequest()->stream_name() << " is denied for subject " - << token.GetUserSID(), ActorContext()); + << token.GetUserSID()); } } @@ -1433,7 +1439,7 @@ namespace NKikimr::NDataStreams::V1 { } ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND), - TStringBuilder() << "No such shard: " << ShardId, ActorContext()); + TStringBuilder() << "No such shard: " << ShardId); } void TGetShardIteratorActor::SendResponse(const TActorContext& ctx, const TShardIterator& shardIt) { @@ -1507,17 +1513,17 @@ namespace NKikimr::NDataStreams::V1 { if (ShardIterator.IsExpired()) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::EXPIRED_ITERATOR), - TStringBuilder() << "Provided shard iterator is expired", ctx); + TStringBuilder() << "Provided shard iterator is expired"); } if (!ShardIterator.IsValid()) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), - TStringBuilder() << "Provided shard iterator is malformed", ctx); + TStringBuilder() << "Provided shard iterator is malformed"); } Limit = Limit == 0 ? MAX_LIMIT : Limit; if (Limit < 1 || Limit > MAX_LIMIT) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR), - TStringBuilder() << "Limit '" << Limit << "' is out of bounds [1; " << MAX_LIMIT << "]", ctx); + TStringBuilder() << "Limit '" << Limit << "' is out of bounds [1; " << MAX_LIMIT << "]"); } SendDescribeProposeRequest(ctx, ShardIterator.IsCdcTopic()); @@ -1579,7 +1585,7 @@ namespace NKikimr::NDataStreams::V1 { TStringBuilder() << "Access to stream " << ShardIterator.GetStreamName() << " is denied for subject " - << token.GetUserSID(), ActorContext()); + << token.GetUserSID()); } } @@ -1597,7 +1603,7 @@ namespace NKikimr::NDataStreams::V1 { } ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND), - TStringBuilder() << "No such shard: " << ShardIterator.GetShardId(), ActorContext()); + TStringBuilder() << "No such shard: " << ShardIterator.GetShardId()); } void TGetRecordsActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) { @@ -1619,7 +1625,7 @@ namespace NKikimr::NDataStreams::V1 { default: return ReplyWithError(ConvertPersQueueInternalCodeToStatus(record.GetErrorCode()), ConvertOldCode(record.GetErrorCode()), - record.GetErrorReason(), ctx); + record.GetErrorReason()); } break; default: {} @@ -1661,16 +1667,16 @@ namespace NKikimr::NDataStreams::V1 { } } - void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { + void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext&) { if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) { ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED, - TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); + TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId); } } - void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { + void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext&) { ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED, - TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); + TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId); } void TGetRecordsActor::Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) { @@ -1769,21 +1775,21 @@ namespace NKikimr::NDataStreams::V1 { if (!GetProtoRequest()->next_token().empty() && !GetProtoRequest()->stream_name().empty()) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_PARAMETER_COMBINATION), - TStringBuilder() << "StreamName and NextToken can not be provided together", ctx); + TStringBuilder() << "StreamName and NextToken can not be provided together"); } if (NextToken.IsExpired()) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::EXPIRED_TOKEN), - TStringBuilder() << "Provided next token is expired", ctx); + TStringBuilder() << "Provided next token is expired"); } if (!NextToken.IsValid()) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), - TStringBuilder() << "Provided next token is malformed", ctx); + TStringBuilder() << "Provided next token is malformed"); } if (!TShardFilter::ShardFilterType_IsValid(ShardFilter.type())) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), TStringBuilder() << "Shard filter '" << - (ui32)ShardFilter.type() << "' is not known", ctx); + (ui32)ShardFilter.type() << "' is not known"); } MaxResults = MaxResults == 0 ? DEFAULT_MAX_RESULTS : MaxResults; @@ -1791,13 +1797,13 @@ namespace NKikimr::NDataStreams::V1 { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR), TStringBuilder() << "Max results '" << MaxResults << "' is out of bound [" << MIN_MAX_RESULTS << "; " << - MAX_MAX_RESULTS << "]", ctx); + MAX_MAX_RESULTS << "]"); } if (ShardFilter.type() == TShardFilter::AFTER_SHARD_ID && ShardFilter.shard_id() == "") { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::MISSING_PARAMETER), TStringBuilder() << "Shard filter type is AFTER_SHARD_ID," << - " but no ShardId provided", ctx); + " but no ShardId provided"); } SendDescribeProposeRequest(ctx); @@ -1831,7 +1837,7 @@ namespace NKikimr::NDataStreams::V1 { TStringBuilder() << "Access to stream " << this->GetProtoRequest()->stream_name() << " is denied for subject " - << token.GetUserSID(), ActorContext()); + << token.GetUserSID()); } } @@ -1888,7 +1894,7 @@ namespace NKikimr::NDataStreams::V1 { if (alreadyRead > (ui32)partitions.size()) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), TStringBuilder() << "Provided next_token is malformed - " - "everything is already read", ctx); + "everything is already read"); } const auto shardsToGet = std::min(partitions.size() - alreadyRead, MaxResults); @@ -1944,16 +1950,16 @@ namespace NKikimr::NDataStreams::V1 { } } - void TListShardsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { + void TListShardsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext&) { if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) { ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED, - TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); + TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId); } } - void TListShardsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { + void TListShardsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext&) { ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED, - TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); + TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId); } void TListShardsActor::SendResponse(const TActorContext& ctx) { diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h index f66c46af46b..645a1ab7eeb 100644 --- a/ydb/services/datastreams/put_records_actor.h +++ b/ydb/services/datastreams/put_records_actor.h @@ -282,7 +282,7 @@ namespace NKikimr::NDataStreams::V1 { if (!error.empty()) { return this->ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, - error, ctx); + error); } if (this->Request_->GetSerializedToken().empty()) { @@ -291,7 +291,7 @@ namespace NKikimr::NDataStreams::V1 { Ydb::PersQueue::ErrorCode::ACCESS_DENIED, TStringBuilder() << "Access to stream " << this->GetProtoRequest()->stream_name() - << " is denied", ctx); + << " is denied"); } } NACLib::TUserToken token(this->Request_->GetSerializedToken()); @@ -332,7 +332,7 @@ namespace NKikimr::NDataStreams::V1 { TStringBuilder() << "Access for stream " << this->GetProtoRequest()->stream_name() << " is denied for subject " - << token.GetUserSID(), this->ActorContext()); + << token.GetUserSID()); } } @@ -346,7 +346,7 @@ namespace NKikimr::NDataStreams::V1 { Ydb::PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder() << "write to mirrored stream " << this->GetProtoRequest()->stream_name() - << " is forbidden", this->ActorContext()); + << " is forbidden"); } @@ -526,10 +526,10 @@ namespace NKikimr::NDataStreams::V1 { if (putRecordsResult.records(0).error_code() == "ProvisionedThroughputExceededException" || putRecordsResult.records(0).error_code() == "ThrottlingException") { - return ReplyWithError(Ydb::StatusIds::OVERLOADED, Ydb::PersQueue::ErrorCode::OVERLOAD, putRecordsResult.records(0).error_message(), ctx); + return ReplyWithError(Ydb::StatusIds::OVERLOADED, Ydb::PersQueue::ErrorCode::OVERLOAD, putRecordsResult.records(0).error_message()); } //TODO: other codes - access denied and so on - return ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR, putRecordsResult.records(0).error_message(), ctx); + return ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR, putRecordsResult.records(0).error_message()); } } diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index 2f4a76da2ac..73b85dd72af 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -29,9 +29,8 @@ namespace NKikimr::NGRpcProxy::V1 { return value == compareTo ? defaultValue : value; } - TClientServiceTypes GetSupportedClientServiceTypes(const TActorContext& ctx) { + TClientServiceTypes GetSupportedClientServiceTypes(const NKikimrPQ::TPQConfig& pqConfig) { TClientServiceTypes serviceTypes; - const auto& pqConfig = AppData(ctx)->PQConfig; ui32 count = pqConfig.GetDefaultClientServiceType().GetMaxReadRulesCountPerTopic(); if (count == 0) count = Max<ui32>(); TString name = pqConfig.GetDefaultClientServiceType().GetName(); @@ -56,14 +55,13 @@ namespace NKikimr::NGRpcProxy::V1 { return serviceTypes; } - TString ReadRuleServiceTypeMigration(NKikimrPQ::TPQTabletConfig *config, const TActorContext& ctx) { + TString ReadRuleServiceTypeMigration(NKikimrPQ::TPQTabletConfig *config, const NKikimrPQ::TPQConfig& pqConfig) { auto rrServiceTypes = config->MutableReadRuleServiceTypes(); if (config->ReadRuleServiceTypesSize() > config->ReadRulesSize()) { rrServiceTypes->Clear(); } if (config->ReadRuleServiceTypesSize() < config->ReadRulesSize()) { rrServiceTypes->Reserve(config->ReadRulesSize()); - const auto& pqConfig = AppData(ctx)->PQConfig; if (pqConfig.GetDisallowDefaultClientServiceType()) { return "service type must be set for all read rules"; } @@ -78,10 +76,10 @@ namespace NKikimr::NGRpcProxy::V1 { NKikimrPQ::TPQTabletConfig* config, const Ydb::PersQueue::V1::TopicSettings::ReadRule& rr, const TClientServiceTypes& supportedClientServiceTypes, - const TActorContext& ctx + const NKikimrPQ::TPQConfig& pqConfig ) { - auto consumerName = NPersQueue::ConvertNewConsumerName(rr.consumer_name(), ctx); + auto consumerName = NPersQueue::ConvertNewConsumerName(rr.consumer_name(), pqConfig); if (consumerName.empty()) { return TMsgPqCodes(TStringBuilder() << "consumer with empty name is forbidden", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); } @@ -92,7 +90,7 @@ namespace NKikimr::NGRpcProxy::V1 { ); } { - TString migrationError = ReadRuleServiceTypeMigration(config, ctx); + TString migrationError = ReadRuleServiceTypeMigration(config, pqConfig); if (migrationError) { return TMsgPqCodes(migrationError, Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } @@ -184,7 +182,6 @@ namespace NKikimr::NGRpcProxy::V1 { config->AddReadRuleServiceTypes(rr.service_type()); } } else { - const auto& pqConfig = AppData(ctx)->PQConfig; if (pqConfig.GetDisallowDefaultClientServiceType()) { return TMsgPqCodes( TStringBuilder() << "service type cannot be empty for consumer '" << rr.consumer_name() << "'", @@ -221,9 +218,10 @@ namespace NKikimr::NGRpcProxy::V1 { const Ydb::Topic::Consumer& rr, const TClientServiceTypes& supportedClientServiceTypes, const bool checkServiceType, - const TActorContext& ctx + const NKikimrPQ::TPQConfig& pqConfig, + bool enableTopicDiskSubDomainQuota ) { - auto consumerName = NPersQueue::ConvertNewConsumerName(rr.name(), ctx); + auto consumerName = NPersQueue::ConvertNewConsumerName(rr.name(), pqConfig); if (consumerName.find("/") != TString::npos || consumerName.find("|") != TString::npos) { return TMsgPqCodes(TStringBuilder() << "consumer '" << rr.name() << "' has illegal symbols", Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } @@ -231,7 +229,7 @@ namespace NKikimr::NGRpcProxy::V1 { return TMsgPqCodes(TStringBuilder() << "consumer with empty name is forbidden", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); } { - TString migrationError = ReadRuleServiceTypeMigration(config, ctx); + TString migrationError = ReadRuleServiceTypeMigration(config, pqConfig); if (migrationError) { return TMsgPqCodes(migrationError, migrationError.empty() ? Ydb::PersQueue::ErrorCode::OK : Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); //find better issueCode } @@ -261,7 +259,6 @@ namespace NKikimr::NGRpcProxy::V1 { } TString serviceType; - const auto& pqConfig = AppData(ctx)->PQConfig; const auto& defaultClientServiceType = pqConfig.GetDefaultClientServiceType().GetName(); serviceType = defaultClientServiceType; @@ -349,7 +346,7 @@ namespace NKikimr::NGRpcProxy::V1 { } if (rr.important()) { - if (pqConfig.GetTopicsAreFirstClassCitizen() && !AppData(ctx)->FeatureFlags.GetEnableTopicDiskSubDomainQuota()) { + if (pqConfig.GetTopicsAreFirstClassCitizen() && !enableTopicDiskSubDomainQuota) { return TMsgPqCodes(TStringBuilder() << "important flag is forbiden for consumer " << rr.name(), Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } consumer->SetImportant(true); @@ -366,7 +363,7 @@ namespace NKikimr::NGRpcProxy::V1 { NKikimrPQ::TPQTabletConfig* config, const NKikimrPQ::TPQTabletConfig& originalConfig, const TString& consumerName, - const TActorContext& ctx + const NKikimrPQ::TPQConfig& pqConfig ) { config->ClearReadRuleVersions(); config->ClearReadRules(); @@ -387,7 +384,6 @@ namespace NKikimr::NGRpcProxy::V1 { bool removed = false; - const auto& pqConfig = AppData(ctx)->PQConfig; if (NPQ::ReadRuleCompatible()) { for (size_t i = 0; i < originalConfig.ReadRulesSize(); i++) { auto& readRule = originalConfig.GetReadRules(i); @@ -437,7 +433,7 @@ namespace NKikimr::NGRpcProxy::V1 { bool CheckReadRulesConfig(const NKikimrPQ::TPQTabletConfig& config, const TClientServiceTypes& supportedClientServiceTypes, - TString& error, const TActorContext& ctx) { + TString& error, const NKikimrPQ::TPQConfig& pqConfig) { size_t consumerCount = NPQ::ConsumerCount(config); if (consumerCount > MAX_READ_RULES_COUNT) { @@ -470,7 +466,7 @@ namespace NKikimr::NGRpcProxy::V1 { } if (config.GetCodecs().IdsSize() > 0) { for (const auto& consumer : config.GetConsumers()) { - TString name = NPersQueue::ConvertOldConsumerName(consumer.GetName(), ctx); + TString name = NPersQueue::ConvertOldConsumerName(consumer.GetName(), pqConfig); if (consumer.GetCodec().IdsSize() > 0) { THashSet<i64> codecs; @@ -492,13 +488,13 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::StatusIds::StatusCode CheckConfig(const NKikimrPQ::TPQTabletConfig& config, const TClientServiceTypes& supportedClientServiceTypes, - TString& error, const TActorContext& ctx, const Ydb::StatusIds::StatusCode dubsStatus) + TString& error, const NKikimrPQ::TPQConfig& pqConfig, const Ydb::StatusIds::StatusCode dubsStatus) { ui32 speed = config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(); ui32 burst = config.GetPartitionConfig().GetBurstSize(); std::set<ui32> validLimits {}; - if (AppData(ctx)->PQConfig.ValidWriteSpeedLimitsKbPerSecSize() == 0) { + if (pqConfig.ValidWriteSpeedLimitsKbPerSecSize() == 0) { validLimits.insert(speed); } else { const auto& limits = AppData()->PQConfig.GetValidWriteSpeedLimitsKbPerSec(); @@ -546,7 +542,7 @@ namespace NKikimr::NGRpcProxy::V1 { return Ydb::StatusIds::BAD_REQUEST; } - bool hasDuplicates = CheckReadRulesConfig(config, supportedClientServiceTypes, error, ctx); + bool hasDuplicates = CheckReadRulesConfig(config, supportedClientServiceTypes, error, pqConfig); return error.empty() ? Ydb::StatusIds::SUCCESS : (hasDuplicates ? dubsStatus : Ydb::StatusIds::BAD_REQUEST); } @@ -935,14 +931,14 @@ namespace NKikimr::NGRpcProxy::V1 { } { - error = ReadRuleServiceTypeMigration(pqTabletConfig, ctx); + error = ReadRuleServiceTypeMigration(pqTabletConfig, pqConfig); if (error) { return Ydb::StatusIds::INTERNAL_ERROR; } } - const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(ctx); + const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(pqConfig); for (const auto& rr : settings.read_rules()) { - auto messageAndCode = AddReadRuleToConfig(pqTabletConfig, rr, supportedClientServiceTypes, ctx); + auto messageAndCode = AddReadRuleToConfig(pqTabletConfig, rr, supportedClientServiceTypes, pqConfig); if (messageAndCode.PQCode != Ydb::PersQueue::ErrorCode::OK) { error = messageAndCode.Message; return Ydb::StatusIds::BAD_REQUEST; @@ -1034,7 +1030,7 @@ namespace NKikimr::NGRpcProxy::V1 { } } - return CheckConfig(*pqTabletConfig, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::BAD_REQUEST); + return CheckConfig(*pqTabletConfig, supportedClientServiceTypes, error, pqConfig, Ydb::StatusIds::BAD_REQUEST); } static bool FillMeteringMode(Ydb::Topic::MeteringMode mode, NKikimrPQ::TPQTabletConfig& config, @@ -1074,10 +1070,10 @@ namespace NKikimr::NGRpcProxy::V1 { TYdbPqCodes FillProposeRequestImpl( const TString& name, const Ydb::Topic::CreateTopicRequest& request, - NKikimrSchemeOp::TModifyScheme& modifyScheme, const TActorContext& ctx, + NKikimrSchemeOp::TModifyScheme& modifyScheme, TAppData* appData, TString& error, const TString& path, const TString& database, const TString& localDc ) { - const auto& pqConfig = AppData(ctx)->PQConfig; + const auto& pqConfig = appData->PQConfig; modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup); auto pqDescr = modifyScheme.MutableCreatePersQueueGroup(); @@ -1098,7 +1094,7 @@ namespace NKikimr::NGRpcProxy::V1 { return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); } minParts = std::max<ui32>(1, settings.min_active_partitions()); - if (AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge() && request.has_partitioning_settings()) { + if (appData->FeatureFlags.GetEnableTopicSplitMerge() && request.has_partitioning_settings()) { auto pqTabletConfigPartStrategy = pqTabletConfig->MutablePartitionStrategy(); auto autoscaleSettings = settings.auto_partitioning_settings(); pqTabletConfigPartStrategy->SetMinPartitionCount(minParts); @@ -1214,7 +1210,7 @@ namespace NKikimr::NGRpcProxy::V1 { } { - error = ReadRuleServiceTypeMigration(pqTabletConfig, ctx); + error = ReadRuleServiceTypeMigration(pqTabletConfig, pqConfig); if (error) { return TYdbPqCodes(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } @@ -1225,23 +1221,25 @@ namespace NKikimr::NGRpcProxy::V1 { return TYdbPqCodes(code, Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } - const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(ctx); + const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(pqConfig); for (const auto& rr : request.consumers()) { - auto messageAndCode = AddReadRuleToConfig(pqTabletConfig, rr, supportedClientServiceTypes, true, ctx); + auto messageAndCode = AddReadRuleToConfig(pqTabletConfig, rr, supportedClientServiceTypes, true, pqConfig, + appData->FeatureFlags.GetEnableTopicDiskSubDomainQuota()); if (messageAndCode.PQCode != Ydb::PersQueue::ErrorCode::OK) { error = messageAndCode.Message; return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, messageAndCode.PQCode); } } - return TYdbPqCodes(CheckConfig(*pqTabletConfig, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::BAD_REQUEST), Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); + return TYdbPqCodes(CheckConfig(*pqTabletConfig, supportedClientServiceTypes, error, pqConfig, Ydb::StatusIds::BAD_REQUEST), + Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); } Ydb::StatusIds::StatusCode FillProposeRequestImpl( const Ydb::Topic::AlterTopicRequest& request, - NKikimrSchemeOp::TPersQueueGroupDescription& pqDescr, const TActorContext& ctx, + NKikimrSchemeOp::TPersQueueGroupDescription& pqDescr, TAppData* appData, TString& error, bool isCdcStream ) { #define CHECK_CDC if (isCdcStream) {\ @@ -1249,11 +1247,11 @@ namespace NKikimr::NGRpcProxy::V1 { return Ydb::StatusIds::BAD_REQUEST;\ } - const auto& pqConfig = AppData(ctx)->PQConfig; + const auto& pqConfig = appData->PQConfig; auto pqTabletConfig = pqDescr.MutablePQTabletConfig(); NPQ::Migrate(*pqTabletConfig); auto partConfig = pqTabletConfig->MutablePartitionConfig(); - auto splitMergeFeatureEnabled = AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge(); + auto splitMergeFeatureEnabled = appData->FeatureFlags.GetEnableTopicSplitMerge(); if (request.has_set_retention_storage_mb()) { CHECK_CDC; @@ -1363,7 +1361,7 @@ namespace NKikimr::NGRpcProxy::V1 { } } { - error = ReadRuleServiceTypeMigration(pqTabletConfig, ctx); + error = ReadRuleServiceTypeMigration(pqTabletConfig, pqConfig); if (error) { return Ydb::StatusIds::INTERNAL_ERROR; } @@ -1374,7 +1372,7 @@ namespace NKikimr::NGRpcProxy::V1 { return code; } - const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(ctx); + const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(pqConfig); std::vector<std::pair<bool, Ydb::Topic::Consumer>> consumers; @@ -1383,7 +1381,7 @@ namespace NKikimr::NGRpcProxy::V1 { for (const auto& c : pqTabletConfig->GetConsumers()) { auto& oldName = c.GetName(); - auto name = NPersQueue::ConvertOldConsumerName(oldName, ctx); + auto name = NPersQueue::ConvertOldConsumerName(oldName, pqConfig); bool erase = false; for (auto consumer: request.drop_consumers()) { @@ -1418,7 +1416,7 @@ namespace NKikimr::NGRpcProxy::V1 { for (const auto& alter : request.alter_consumers()) { auto name = alter.name(); - auto oldName = NPersQueue::ConvertOldConsumerName(name, ctx); + auto oldName = NPersQueue::ConvertOldConsumerName(name, pqConfig); bool found = false; for (auto& consumer : consumers) { if (consumer.second.name() == name || consumer.second.name() == oldName) { @@ -1445,13 +1443,14 @@ namespace NKikimr::NGRpcProxy::V1 { pqTabletConfig->ClearConsumers(); for (const auto& rr : consumers) { - auto messageAndCode = AddReadRuleToConfig(pqTabletConfig, rr.second, supportedClientServiceTypes, rr.first, ctx); + auto messageAndCode = AddReadRuleToConfig(pqTabletConfig, rr.second, supportedClientServiceTypes, rr.first, + pqConfig, appData->FeatureFlags.GetEnableTopicDiskSubDomainQuota()); if (messageAndCode.PQCode != Ydb::PersQueue::ErrorCode::OK) { error = messageAndCode.Message; return Ydb::StatusIds::BAD_REQUEST; } } - return CheckConfig(*pqTabletConfig, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS); + return CheckConfig(*pqTabletConfig, supportedClientServiceTypes, error, pqConfig, Ydb::StatusIds::ALREADY_EXISTS); } } diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h index b39ec377f58..fbeb92225f3 100644 --- a/ydb/services/lib/actors/pq_schema_actor.h +++ b/ydb/services/lib/actors/pq_schema_actor.h @@ -47,7 +47,7 @@ namespace NKikimr::NGRpcProxy::V1 { const TString& name, const Ydb::Topic::CreateTopicRequest& request, NKikimrSchemeOp::TModifyScheme& modifyScheme, - const TActorContext& ctx, + TAppData* appData, TString& error, const TString& path, const TString& database = TString(), @@ -57,7 +57,7 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::StatusIds::StatusCode FillProposeRequestImpl( const Ydb::Topic::AlterTopicRequest& request, NKikimrSchemeOp::TPersQueueGroupDescription& pqDescr, - const TActorContext& ctx, + TAppData* appData, TString& error, bool isCdcStream ); @@ -69,24 +69,24 @@ namespace NKikimr::NGRpcProxy::V1 { TVector<TString> PasswordHashes; }; typedef std::map<TString, TClientServiceType> TClientServiceTypes; - TClientServiceTypes GetSupportedClientServiceTypes(const TActorContext& ctx); + TClientServiceTypes GetSupportedClientServiceTypes(const NKikimrPQ::TPQConfig& pqConfig); // Returns true if have duplicated read rules Ydb::StatusIds::StatusCode CheckConfig(const NKikimrPQ::TPQTabletConfig& config, const TClientServiceTypes& supportedReadRuleServiceTypes, - TString& error, const TActorContext& ctx, + TString& error, const NKikimrPQ::TPQConfig& pqConfig, const Ydb::StatusIds::StatusCode dubsStatus = Ydb::StatusIds::BAD_REQUEST); TMsgPqCodes AddReadRuleToConfig( NKikimrPQ::TPQTabletConfig *config, const Ydb::PersQueue::V1::TopicSettings::ReadRule& rr, const TClientServiceTypes& supportedReadRuleServiceTypes, - const TActorContext& ctx + const NKikimrPQ::TPQConfig& pqConfig ); TString RemoveReadRuleFromConfig( NKikimrPQ::TPQTabletConfig *config, const NKikimrPQ::TPQTabletConfig& originalConfig, const TString& consumerName, - const TActorContext& ctx + const NKikimrPQ::TPQConfig& pqConfig ); NYql::TIssue FillIssue(const TString &errorReason, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode); NYql::TIssue FillIssue(const TString &errorReason, const size_t errorCode); @@ -119,7 +119,7 @@ namespace NKikimr::NGRpcProxy::V1 { protected: virtual TString GetTopicPath() const = 0; - virtual void RespondWithCode(Ydb::StatusIds::StatusCode status) = 0; + virtual void RespondWithCode(Ydb::StatusIds::StatusCode status, bool notFound = false) = 0; virtual void AddIssue(const NYql::TIssue& issue) = 0; virtual bool SetRequestToken(NSchemeCache::TSchemeCacheNavigate* request) const = 0; @@ -158,7 +158,7 @@ namespace NKikimr::NGRpcProxy::V1 { } AddIssue(FillIssue(TStringBuilder() << "path '" << JoinPath(entry.Path) << "' is not a topic", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR)); - RespondWithCode(Ydb::StatusIds::SCHEME_ERROR); + RespondWithCode(Ydb::StatusIds::SCHEME_ERROR, true); return true; } @@ -202,7 +202,7 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::PersQueue::ErrorCode::ACCESS_DENIED ) ); - return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR); + return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR, true); } case NSchemeCache::TSchemeCacheNavigate::EStatus::TableCreationNotComplete: { AddIssue( @@ -240,6 +240,7 @@ namespace NKikimr::NGRpcProxy::V1 { } } + public: void StateWork(TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); @@ -296,7 +297,7 @@ namespace NKikimr::NGRpcProxy::V1 { protected: // TDerived must implement FillProposeRequest(TEvProposeTransaction&, const TActorContext& ctx, TString workingDir, TString name); - void SendProposeRequest(const NActors::TActorContext &ctx) { + void SendProposeRequest(const NActors::TActorContext& ctx) { std::pair <TString, TString> pathPair; try { pathPair = NKikimr::NGRpcService::SplitPath(GetTopicPath()); @@ -316,7 +317,7 @@ namespace NKikimr::NGRpcProxy::V1 { if (this->Request_->GetSerializedToken().empty()) { if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, Ydb::PersQueue::ErrorCode::ACCESS_DENIED, - "Unauthenticated access is forbidden, please provide credentials", ctx); + "Unauthenticated access is forbidden, please provide credentials"); } } else { proposal->Record.SetUserToken(this->Request_->GetSerializedToken()); @@ -363,21 +364,22 @@ namespace NKikimr::NGRpcProxy::V1 { } void ReplyWithError(Ydb::StatusIds::StatusCode status, size_t additionalStatus, - const TString& messageText, const NActors::TActorContext& ctx) { + const TString& messageText) { if (TActorBase::IsDead) return; this->Request_->RaiseIssue(FillIssue(messageText, additionalStatus)); this->Request_->ReplyWithYdbStatus(status); - this->Die(ctx); + this->Die(this->ActorContext()); TActorBase::IsDead = true; } - void RespondWithCode(Ydb::StatusIds::StatusCode status) override { + void RespondWithCode(Ydb::StatusIds::StatusCode status, bool notFound = false) override { if (TActorBase::IsDead) return; this->Request_->ReplyWithYdbStatus(status); this->Die(this->ActorContext()); TActorBase::IsDead = true; + Y_UNUSED(notFound); } template<class TProtoResult> @@ -403,25 +405,22 @@ namespace NKikimr::NGRpcProxy::V1 { //----------------------------------------------------------------------------------- - template<class TDerived, class TRequest> - class TUpdateSchemeActor : public TPQGrpcSchemaBase<TDerived, TRequest> { - using TBase = TPQGrpcSchemaBase<TDerived, TRequest>; - + template<class TDerived> + class TUpdateSchemeActorBase { public: - TUpdateSchemeActor(NGRpcService::IRequestOpCtx* request, const TString& topicPath) - : TBase(request, topicPath) - {} - TUpdateSchemeActor(NGRpcService::IRequestOpCtx* request) - : TBase(request) - {} - ~TUpdateSchemeActor() = default; + ~TUpdateSchemeActorBase() = default; - void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TActorContext& ctx, - const TString& workingDir, const TString& name) + virtual void ModifyPersqueueConfig(TAppData* appData, + NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, + const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, + const NKikimrSchemeOp::TDirEntry& selfInfo + ) = 0; + + protected: + void FillModifyScheme(NKikimrSchemeOp::TModifyScheme& modifyScheme, const TActorContext& ctx, + const TString& workingDir, const TString& name) { Y_UNUSED(name); - const auto& response = DescribeSchemeResult->Get()->Request.Get()->ResultSet.front(); - NKikimrSchemeOp::TModifyScheme& modifyScheme(*proposal.Record.MutableTransaction()->MutableModifyScheme()); modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup); modifyScheme.SetWorkingDir(workingDir); @@ -430,6 +429,8 @@ namespace NKikimr::NGRpcProxy::V1 { } auto* config = modifyScheme.MutableAlterPersQueueGroup(); + const auto& response = DescribeSchemeResult->Get()->Request.Get()->ResultSet.front(); + Y_ABORT_UNLESS(response.Self); Y_ABORT_UNLESS(response.PQGroupInfo); config->CopyFrom(response.PQGroupInfo->Description); @@ -444,33 +445,64 @@ namespace NKikimr::NGRpcProxy::V1 { applyIf->SetPathVersion(response.Self->Info.GetPathVersion()); } - static_cast<TDerived*>(this)->ModifyPersqueueConfig( - ctx, + ModifyPersqueueConfig( + AppData(ctx), *config, response.PQGroupInfo->Description, response.Self->Info ); - - this->DescribeSchemeResult.Reset(); } - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + virtual void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get(); Y_ABORT_UNLESS(result->ResultSet.size() == 1); DescribeSchemeResult = std::move(ev); + } + protected: + THolder<NActors::TEventHandle<TEvTxProxySchemeCache::TEvNavigateKeySetResult>> DescribeSchemeResult; + }; + + + template<class TDerived, class TRequest> + class TUpdateSchemeActor : public TPQGrpcSchemaBase<TDerived, TRequest>, + public TUpdateSchemeActorBase<TDerived> { + using TBase = TPQGrpcSchemaBase<TDerived, TRequest>; + using TUpdateSchemeBase = TUpdateSchemeActorBase<TDerived>; + + public: + TUpdateSchemeActor(NGRpcService::IRequestOpCtx* request, const TString& topicPath) + : TBase(request, topicPath) + {} + TUpdateSchemeActor(NGRpcService::IRequestOpCtx* request) + : TBase(request) + {} + ~TUpdateSchemeActor() = default; + + + void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TActorContext& ctx, + const TString& workingDir, const TString& name) + { + NKikimrSchemeOp::TModifyScheme& modifyScheme(*proposal.Record.MutableTransaction()->MutableModifyScheme()); + this->FillModifyScheme(modifyScheme, ctx, workingDir, name); + this->DescribeSchemeResult.Reset(); + } + + + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override { + TUpdateSchemeBase::HandleCacheNavigateResponse(ev); return this->SendProposeRequest(this->ActorContext()); } + void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const TActorContext& ctx) { auto msg = ev->Get(); const auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(ev->Get()->Record.GetStatus()); - if (status == TEvTxUserProxy::TResultStatus::ExecError && msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusPreconditionFailed) + if (status == TEvTxUserProxy::TResultStatus::ExecError && msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusPreconditionFailed) { return TBase::ReplyWithError(Ydb::StatusIds::OVERLOADED, Ydb::PersQueue::ErrorCode::OVERLOAD, - TStringBuilder() << "Topic with name " << TBase::GetTopicPath() << " has another alter in progress", - ctx); + TStringBuilder() << "Topic with name " << TBase::GetTopicPath() << " has another alter in progress"); } return TBase::TBase::Handle(ev, ctx); @@ -482,12 +514,8 @@ namespace NKikimr::NGRpcProxy::V1 { default: TBase::StateWork(ev); } } - - protected: - THolder<NActors::TEventHandle<TEvTxProxySchemeCache::TEvNavigateKeySetResult>> DescribeSchemeResult; }; - template<class TDerived, class TRequest, class TEvResponse> class TPQInternalSchemaActor : public TPQSchemaBase<TPQInternalSchemaActor<TDerived, TRequest, TEvResponse>> , public TActorBootstrapped<TPQInternalSchemaActor<TDerived, TRequest, TEvResponse>> @@ -542,19 +570,32 @@ namespace NKikimr::NGRpcProxy::V1 { Response->Issues.AddIssue(issue); } - - void RespondWithCode(Ydb::StatusIds::StatusCode status) override { - Response->Status = status; - this->ActorContext().Send(Requester, Response.Release()); + void RespondWithCode(Ydb::StatusIds::StatusCode status, bool notFound = false) override { + if (!RespondOverride(status, notFound)) { + Response->Status = status; + this->ActorContext().Send(Requester, Response.Release()); + } this->Die(this->ActorContext()); TBase::IsDead = true; } + protected: + const TRequest& GetRequest() const { + return Request; + } + + virtual bool RespondOverride(Ydb::StatusIds::StatusCode status, bool notFound) { + Y_UNUSED(status); + Y_UNUSED(notFound); + return false; + } + private: TRequest Request; TActorId Requester; TMaybe<TString> PrivateTopicName; + protected: THolder<TEvResponse> Response; TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo; diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h index d311d8bfa77..96f3471b5ce 100644 --- a/ydb/services/persqueue_v1/actors/events.h +++ b/ydb/services/persqueue_v1/actors/events.h @@ -25,6 +25,16 @@ using namespace Ydb; // ui64 Cookie; // }; +struct TLocalResponseBase { + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; +}; + + +struct TAlterTopicResponse : public TLocalResponseBase { + NKikimrSchemeOp::TModifyScheme ModifyScheme; +}; + struct TEvPQProxy { enum EEv { EvWriteInit = EventSpaceBegin(TKikimrEvents::ES_PQ_PROXY_NEW), // TODO: Replace 'NEW' with version or something @@ -77,6 +87,7 @@ struct TEvPQProxy { EvDirectReadSendClientData, EvReadingStarted, EvReadingFinished, + EvAlterTopicResponse, EvEnd }; @@ -490,11 +501,6 @@ struct TEvPQProxy { ui64 TabletId; }; - struct TLocalResponseBase { - Ydb::StatusIds::StatusCode Status; - NYql::TIssues Issues; - }; - struct TPartitionLocationInfo { ui64 PartitionId; ui64 Generation; @@ -635,6 +641,11 @@ struct TEvPQProxy { std::vector<ui32> AdjacentPartitionIds; std::vector<ui32> ChildPartitionIds; }; + + struct TEvAlterTopicResponse : public TEventLocal<TEvAlterTopicResponse, EvAlterTopicResponse> + , public TLocalResponseBase { + TAlterTopicResponse Response; + }; }; struct TLocalRequestBase { @@ -662,4 +673,23 @@ struct TGetPartitionsLocationRequest : public TLocalRequestBase { TVector<ui32> PartitionIds; }; + +struct TAlterTopicRequest : public TLocalRequestBase { + TAlterTopicRequest(Ydb::Topic::AlterTopicRequest&& request, const TString& workDir, const TString& name, + const TString& database, const TString& token, bool missingOk) + : TLocalRequestBase(request.path(), database, token) + , Request(std::move(request)) + , WorkingDir(workDir) + , Name(name) + , MissingOk(missingOk) + {} + + Ydb::Topic::AlterTopicRequest Request; + TString WorkingDir; + TString Name; + bool MissingOk; +}; + + + } diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index b953a9f0198..c68c6fe205a 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -222,31 +222,32 @@ void TAddReadRuleActor::Bootstrap(const NActors::TActorContext& ctx) { } void TAddReadRuleActor::ModifyPersqueueConfig( - const TActorContext& ctx, + TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, const NKikimrSchemeOp::TDirEntry& selfInfo ) { Y_UNUSED(pqGroupDescription); - auto* pqConfig = groupConfig.MutablePQTabletConfig(); + auto* tabletConfig = groupConfig.MutablePQTabletConfig(); + const auto& pqConfig = appData->PQConfig; auto rule = GetProtoRequest()->read_rule(); if (rule.version() == 0) { rule.set_version(selfInfo.GetVersion().GetPQVersion()); } - auto serviceTypes = GetSupportedClientServiceTypes(ctx); + auto serviceTypes = GetSupportedClientServiceTypes(pqConfig); TString error; - auto messageAndCode = AddReadRuleToConfig(pqConfig, rule, serviceTypes, ctx); + auto messageAndCode = AddReadRuleToConfig(tabletConfig, rule, serviceTypes, pqConfig); auto status = messageAndCode.PQCode == Ydb::PersQueue::ErrorCode::OK ? - CheckConfig(*pqConfig, serviceTypes, messageAndCode.Message, ctx, Ydb::StatusIds::ALREADY_EXISTS) + CheckConfig(*tabletConfig, serviceTypes, messageAndCode.Message, pqConfig, Ydb::StatusIds::ALREADY_EXISTS) : Ydb::StatusIds::BAD_REQUEST; if (status != Ydb::StatusIds::SUCCESS) { return ReplyWithError(status, status == Ydb::StatusIds::ALREADY_EXISTS ? Ydb::PersQueue::ErrorCode::OK : Ydb::PersQueue::ErrorCode::BAD_REQUEST, - messageAndCode.Message, ctx); + messageAndCode.Message); } } @@ -263,7 +264,7 @@ void TRemoveReadRuleActor::Bootstrap(const NActors::TActorContext& ctx) { } void TRemoveReadRuleActor::ModifyPersqueueConfig( - const TActorContext& ctx, + TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, const NKikimrSchemeOp::TDirEntry& selfInfo @@ -274,10 +275,10 @@ void TRemoveReadRuleActor::ModifyPersqueueConfig( groupConfig.MutablePQTabletConfig(), pqGroupDescription.GetPQTabletConfig(), GetProtoRequest()->consumer_name(), - ctx + appData->PQConfig ); if (!error.Empty()) { - return ReplyWithError(Ydb::StatusIds::NOT_FOUND, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx); + return ReplyWithError(Ydb::StatusIds::NOT_FOUND, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error); } } @@ -373,7 +374,7 @@ void TCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction { TString error; - auto status = FillProposeRequestImpl(name, *GetProtoRequest(), modifyScheme, ctx, error, + auto status = FillProposeRequestImpl(name, *GetProtoRequest(), modifyScheme, AppData(ctx), error, workingDir, proposal.Record.GetDatabaseName(), LocalCluster).YdbCode; if (!error.empty()) { @@ -426,7 +427,7 @@ void TAlterTopicActor::Bootstrap(const NActors::TActorContext& ctx) { } void TAlterTopicActor::ModifyPersqueueConfig( - const TActorContext& ctx, + TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, const NKikimrSchemeOp::TDirEntry& selfInfo @@ -436,7 +437,7 @@ void TAlterTopicActor::ModifyPersqueueConfig( TString error; Y_UNUSED(selfInfo); - auto status = FillProposeRequestImpl(*GetProtoRequest(), groupConfig, ctx, error, GetCdcStreamName().Defined()); + auto status = FillProposeRequestImpl(*GetProtoRequest(), groupConfig, appData, error, GetCdcStreamName().Defined()); if (!error.empty()) { Request_->RaiseIssue(FillIssue(error, Ydb::PersQueue::ErrorCode::BAD_REQUEST)); return RespondWithCode(status); @@ -1564,4 +1565,60 @@ void TPartitionsLocationActor::RaiseError(const TString& error, const Ydb::PersQ this->RespondWithCode(status); } +TAlterTopicActorInternal::TAlterTopicActorInternal( + TAlterTopicActorInternal::TRequest&& request, + NThreading::TPromise<TAlterTopicResponse>&& promise, + bool missingOk +) + : TActorBase(std::move(request), TActorId{}) + , Promise(std::move(promise)) + , MissingOk(missingOk) +{} + +void TAlterTopicActorInternal::Bootstrap(const NActors::TActorContext&) { + SendDescribeProposeRequest(); + Become(&TAlterTopicActorInternal::StateWork); +} + +void TAlterTopicActorInternal::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + if (!TActorBase::HandleCacheNavigateResponseBase(ev)) { + this->Die(ActorContext()); + return; + } + TUpdateSchemeBase::HandleCacheNavigateResponse(ev); + auto& schemeTx = Response->Response.ModifyScheme; + FillModifyScheme(schemeTx, ActorContext(), GetRequest().WorkingDir, GetRequest().Name); +} + +void TAlterTopicActorInternal::ModifyPersqueueConfig( + TAppData* appData, + NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, + const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, + const NKikimrSchemeOp::TDirEntry& selfInfo +) { + Y_UNUSED(pqGroupDescription); + Y_UNUSED(selfInfo); + TString error; + Y_UNUSED(selfInfo); + + auto status = FillProposeRequestImpl(GetRequest().Request, groupConfig, appData, error, false); + if (!error.empty()) { + Response->Response.Issues.AddIssue(error); + } + RespondWithCode(status); +} + +bool TAlterTopicActorInternal::RespondOverride(Ydb::StatusIds::StatusCode status, bool notFound) { + if (MissingOk && notFound) { + Response->Response.Status = Ydb::StatusIds::SUCCESS; + Response->Response.ModifyScheme.Clear(); + + } else { + Response->Response.Status = status; + Response->Response.Issues.AddIssues(std::move(Response->Issues)); + } + Promise.SetValue(std::move(Response->Response)); + return true; +} + } // namespace NKikimr::NGRpcProxy::V1 diff --git a/ydb/services/persqueue_v1/actors/schema_actors.h b/ydb/services/persqueue_v1/actors/schema_actors.h index 6b37cb91138..b6c00f7cdb1 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.h +++ b/ydb/services/persqueue_v1/actors/schema_actors.h @@ -282,10 +282,10 @@ public: TAddReadRuleActor(NKikimr::NGRpcService::TEvPQAddReadRuleRequest *request); void Bootstrap(const NActors::TActorContext& ctx); - void ModifyPersqueueConfig(const TActorContext& ctx, + void ModifyPersqueueConfig(TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, - const NKikimrSchemeOp::TDirEntry& selfInfo); + const NKikimrSchemeOp::TDirEntry& selfInfo) override; }; class TRemoveReadRuleActor : public TUpdateSchemeActor<TRemoveReadRuleActor, TEvPQRemoveReadRuleRequest> @@ -297,10 +297,10 @@ public: TRemoveReadRuleActor(NKikimr::NGRpcService::TEvPQRemoveReadRuleRequest* request); void Bootstrap(const NActors::TActorContext &ctx); - void ModifyPersqueueConfig(const TActorContext& ctx, + void ModifyPersqueueConfig(TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, - const NKikimrSchemeOp::TDirEntry& selfInfo); + const NKikimrSchemeOp::TDirEntry& selfInfo) override; }; @@ -374,13 +374,47 @@ public: TAlterTopicActor(NKikimr::NGRpcService::IRequestOpCtx* request); void Bootstrap(const NActors::TActorContext& ctx); - void ModifyPersqueueConfig(const TActorContext& ctx, + void ModifyPersqueueConfig(TAppData* appData, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, - const NKikimrSchemeOp::TDirEntry& selfInfo); + const NKikimrSchemeOp::TDirEntry& selfInfo) override; }; +class TAlterTopicActorInternal : public TPQInternalSchemaActor<TAlterTopicActorInternal, NKikimr::NGRpcProxy::V1::TAlterTopicRequest, + TEvPQProxy::TEvAlterTopicResponse> + , public TUpdateSchemeActorBase<TAlterTopicActorInternal> + , public TCdcStreamCompatible +{ + using TUpdateSchemeBase = TUpdateSchemeActorBase<TAlterTopicActorInternal>; + using TRequest = NKikimr::NGRpcProxy::V1::TAlterTopicRequest; + using TActorBase = TPQInternalSchemaActor<TAlterTopicActorInternal, TRequest, TEvPQProxy::TEvAlterTopicResponse>; + +public: + TAlterTopicActorInternal(NKikimr::NGRpcProxy::V1::TAlterTopicRequest&& request, + NThreading::TPromise<TAlterTopicResponse>&& promise, + bool notExistsOk); + + void Bootstrap(const NActors::TActorContext& ctx) override; + void ModifyPersqueueConfig(TAppData* appData, + NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, + const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, + const NKikimrSchemeOp::TDirEntry& selfInfo) override; + + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override; + + void StateWork(TAutoPtr<IEventHandle>& ev) { + TActorBase::StateWork(ev); + } + +protected: + bool RespondOverride(Ydb::StatusIds::StatusCode status, bool notFound) override; + +private: + NThreading::TPromise<TAlterTopicResponse> Promise; + bool MissingOk; +}; + class TPartitionsLocationActor : public TPQInternalSchemaActor<TPartitionsLocationActor, TGetPartitionsLocationRequest, TEvPQProxy::TEvPartitionLocationResponse> |
