summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFloatingCrowbar <[email protected]>2024-08-08 17:03:42 +0300
committerGitHub <[email protected]>2024-08-08 14:03:42 +0000
commitbb76355267b4b29abca62e1a0ba1ee797bca7f54 (patch)
tree4bfc5cf171f296e1eb91cd676e2f2003caad8658
parentf7c4264a7287d30f19fc67ebcf41907f1e9fa01d (diff)
Topic control plane DDL support for query service (#7438)
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp12
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp12
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp14
-rw-r--r--ydb/core/kafka_proxy/kafka_events.h12
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp27
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp33
-rw-r--r--ydb/core/kqp/host/kqp_gateway_proxy.cpp119
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasink.cpp12
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp79
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_expr_nodes.json11
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway.h16
-rw-r--r--ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp262
-rw-r--r--ydb/core/protos/kqp_physical.proto3
-rw-r--r--ydb/core/tablet/tablet_counters_aggregator.cpp7
-rw-r--r--ydb/library/persqueue/topic_parser/topic_parser.cpp17
-rw-r--r--ydb/library/persqueue/topic_parser/topic_parser.h2
-rw-r--r--ydb/library/yql/sql/v1/SQLv1.g.in6
-rw-r--r--ydb/library/yql/sql/v1/format/sql_format.cpp18
-rw-r--r--ydb/library/yql/sql/v1/format/sql_format_ut.cpp20
-rw-r--r--ydb/library/yql/sql/v1/node.h9
-rw-r--r--ydb/library/yql/sql/v1/query.cpp17
-rw-r--r--ydb/library/yql/sql/v1/sql_query.cpp42
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp200
-rw-r--r--ydb/services/datastreams/put_records_actor.h12
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.cpp79
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.h131
-rw-r--r--ydb/services/persqueue_v1/actors/events.h40
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.cpp81
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.h46
29 files changed, 989 insertions, 350 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp
index f498a4ee092..55600858669 100644
--- a/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp
@@ -34,14 +34,14 @@ class TAlterConfigsActor : public TAlterTopicActor<TAlterConfigsActor, TKafkaAlt
public:
TAlterConfigsActor(
- TActorId requester,
+ TActorId requester,
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
TString topicPath,
TString databaseName,
std::optional<ui64> retentionMs,
std::optional<ui64> retentionBytes)
: TAlterTopicActor<TAlterConfigsActor, TKafkaAlterConfigsRequest>(
- requester,
+ requester,
userToken,
topicPath,
databaseName)
@@ -54,12 +54,12 @@ public:
~TAlterConfigsActor() = default;
void ModifyPersqueueConfig(
- const TActorContext& ctx,
+ NKikimr::TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
) {
- Y_UNUSED(ctx);
+ Y_UNUSED(appData);
Y_UNUSED(pqGroupDescription);
Y_UNUSED(selfInfo);
@@ -150,7 +150,7 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) {
resource.ResourceName.value(),
Context->DatabasePath,
convertedRetentions.Ms,
- convertedRetentions.Bytes
+ convertedRetentions.Bytes
));
InflyTopics++;
@@ -201,7 +201,7 @@ void TKafkaAlterConfigsActor::Reply(const TActorContext& ctx) {
responseResource.ErrorCode = INVALID_REQUEST;
response->Responses.push_back(responseResource);
responseStatus = INVALID_REQUEST;
- }
+ }
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, responseStatus));
diff --git a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp
index 6e13bd61cc7..521f8598f9b 100644
--- a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp
@@ -215,13 +215,13 @@ class TCreatePartitionsActor : public TAlterTopicActor<TCreatePartitionsActor, T
public:
TCreatePartitionsActor(
- TActorId requester,
+ TActorId requester,
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
TString topicPath,
TString databaseName,
ui32 partitionsNumber)
: TAlterTopicActor<TCreatePartitionsActor, TKafkaTopicModificationRequest>(
- requester,
+ requester,
userToken,
topicPath,
databaseName)
@@ -234,12 +234,12 @@ public:
};
void ModifyPersqueueConfig(
- const TActorContext& ctx,
+ NKikimr::TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
- ) {
- Y_UNUSED(ctx);
+ ) override {
+ Y_UNUSED(appData);
Y_UNUSED(pqGroupDescription);
Y_UNUSED(selfInfo);
@@ -347,7 +347,7 @@ void TKafkaCreatePartitionsActor::Reply(const TActorContext& ctx) {
response->Results.push_back(responseTopic);
responseStatus = INVALID_REQUEST;
- }
+ }
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, responseStatus));
Die(ctx);
diff --git a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp
index 3fad0055a1b..44af7beb5ff 100644
--- a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp
@@ -16,7 +16,7 @@ class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCre
public:
TCreateTopicActor(
- TActorId requester,
+ TActorId requester,
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
TString topicPath,
TString databaseName,
@@ -78,13 +78,13 @@ public:
name,
topicRequest,
modifyScheme,
- ctx,
+ NKikimr::AppData(ctx),
error,
workingDir,
proposal.Record.GetDatabaseName()
);
if (codes.YdbCode != Ydb::StatusIds::SUCCESS) {
- return ReplyWithError(codes.YdbCode, codes.PQCode, error, ctx);
+ return ReplyWithError(codes.YdbCode, codes.PQCode, error);
}
};
@@ -192,7 +192,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {
TopicNamesToRetentions[topicName] = std::pair<std::optional<ui64>, std::optional<ui64>>(
convertedRetentions.Ms,
- convertedRetentions.Bytes
+ convertedRetentions.Bytes
);
ctx.Register(new TCreateTopicActor(
@@ -202,7 +202,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {
Context->DatabasePath,
topic.NumPartitions,
convertedRetentions.Ms,
- convertedRetentions.Bytes
+ convertedRetentions.Bytes
));
InflyTopics++;
@@ -243,7 +243,7 @@ void TKafkaCreateTopicsActor::Reply(const TActorContext& ctx) {
responseTopic.ErrorMessage = TopicNamesToResponses[topicName]->Message;
}
- auto addConfigIfRequired = [this, &topicName, &responseTopic](std::optional<ui64> configValue, TString configName) {
+ auto addConfigIfRequired = [this, &topicName, &responseTopic](std::optional<ui64> configValue, TString configName) {
if (configValue.has_value()) {
TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs config;
config.Name = configName;
@@ -271,7 +271,7 @@ void TKafkaCreateTopicsActor::Reply(const TActorContext& ctx) {
responseTopic.ErrorMessage = "Duplicate topic in request.";
response->Topics.push_back(responseTopic);
responseStatus = INVALID_REQUEST;
- }
+ }
Send(Context->ConnectionId,
new TEvKafka::TEvResponse(CorrelationId, response, responseStatus));
diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h
index f3bacc32b39..14855d40c1d 100644
--- a/ydb/core/kafka_proxy/kafka_events.h
+++ b/ydb/core/kafka_proxy/kafka_events.h
@@ -208,8 +208,8 @@ struct TGetOffsetsRequest : public NKikimr::NGRpcProxy::V1::TLocalRequestBase {
TVector<ui32> PartitionIds;
};
-struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResponse, EvTopicOffsetsResponse>
- , public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase
+struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResponse, EvTopicOffsetsResponse>
+ , public NKikimr::NGRpcProxy::V1::TLocalResponseBase
{
TEvTopicOffsetsResponse()
{}
@@ -217,8 +217,8 @@ struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResp
TVector<TPartitionOffsetsInfo> Partitions;
};
-struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffsetsResponse, EvTopicOffsetsResponse>
- , public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase
+struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffsetsResponse, EvTopicOffsetsResponse>
+ , public NKikimr::NGRpcProxy::V1::TLocalResponseBase
{
TEvCommitedOffsetsResponse()
{}
@@ -228,8 +228,8 @@ struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffse
std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets;
};
-struct TEvTopicModificationResponse : public NActors::TEventLocal<TEvTopicModificationResponse, EvCreateTopicsResponse>
- , public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase
+struct TEvTopicModificationResponse : public NActors::TEventLocal<TEvTopicModificationResponse, EvCreateTopicsResponse>
+ , public NKikimr::NGRpcProxy::V1::TLocalResponseBase
{
enum EStatus {
OK,
diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
index f61cb111663..15835cf536e 100644
--- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
@@ -308,11 +308,29 @@ public:
break;
}
+ case NKqpProto::TKqpSchemeOperation::kCreateTopic: {
+ const auto& modifyScheme = schemeOp.GetCreateTopic();
+ ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
+ break;
+ }
+
+ case NKqpProto::TKqpSchemeOperation::kAlterTopic: {
+ const auto& modifyScheme = schemeOp.GetAlterTopic();
+ ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
+ break;
+ }
+
+ case NKqpProto::TKqpSchemeOperation::kDropTopic: {
+ const auto& modifyScheme = schemeOp.GetDropTopic();
+ ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
+ break;
+ }
+
case NKqpProto::TKqpSchemeOperation::kAnalyzeTable: {
const auto& analyzeOperation = schemeOp.GetAnalyzeTable();
-
+
auto analyzePromise = NewPromise<IKqpGateway::TGenericResult>();
-
+
TVector<TString> columns{analyzeOperation.columns().begin(), analyzeOperation.columns().end()};
IActor* analyzeActor = new TAnalyzeActor(analyzeOperation.GetTablePath(), columns, analyzePromise);
@@ -326,9 +344,10 @@ public:
actorSystem->Send(selfId, ev.Release());
});
-
+
Become(&TKqpSchemeExecuter::ExecuteState);
return;
+
}
default:
@@ -459,7 +478,7 @@ public:
}
void Handle(TEvPrivate::TEvMakeTempDirResult::TPtr& result) {
- if (!result->Get()->Result.Success()) {
+ if (!result->Get()->Result.Success()) {
InternalError(TStringBuilder()
<< "Error creating temporary directory for session " << SessionId
<< ": " << result->Get()->Result.Issues().ToString(true));
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index 4990244c7a9..f33bd700ae3 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -977,7 +977,11 @@ public:
return NotImplemented<TGenericResult>();
}
- TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request) override {
+ TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request, bool existingOk) override {
+ if (existingOk) {
+ return MakeFuture(ResultFromError<TGenericResult>("IF NOT EXISTS statement is not supported for CREATE TOPIC in yql script"));
+ }
+
try {
if (!CheckCluster(cluster)) {
return InvalidCluster<TGenericResult>(cluster);
@@ -989,9 +993,27 @@ public:
catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
}
+ Y_UNUSED(existingOk);
}
- TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request) override {
+ TFuture<NKikimr::NGRpcProxy::V1::TAlterTopicResponse> AlterTopicPrepared(NYql::TAlterTopicSettings&& settings) override {
+ auto schemaTxPromise = NewPromise<NKikimr::NGRpcProxy::V1::TAlterTopicResponse>();
+ auto schemaTxFuture = schemaTxPromise.GetFuture();
+
+ NKikimr::NGRpcProxy::V1::TAlterTopicRequest request{
+ std::move(settings.Request), settings.WorkDir, settings.Name, Database, GetTokenCompat(),
+ settings.MissingOk
+ };
+ IActor* requestHandler = new NKikimr::NGRpcProxy::V1::TAlterTopicActorInternal(std::move(request), std::move(schemaTxPromise), settings.MissingOk);
+ RegisterActor(requestHandler);
+ return schemaTxFuture;
+ }
+
+ TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request, bool missingOk) override {
+ if (missingOk) {
+ return MakeFuture(ResultFromError<TGenericResult>("IF EXISTS statement is not supported for ALTER TOPIC in yql script"));
+ }
+
try {
if (!CheckCluster(cluster)) {
return InvalidCluster<TGenericResult>(cluster);
@@ -1005,7 +1027,11 @@ public:
}
}
- TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic) override {
+ TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic, bool missingOk) override {
+ if (missingOk) {
+ return MakeFuture(ResultFromError<TGenericResult>("IF EXISTS statement is not supported for DROP TOPIC in yql script"));
+ }
+
try {
if (!CheckCluster(cluster)) {
return InvalidCluster<TGenericResult>(cluster);
@@ -1020,6 +1046,7 @@ public:
catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
}
+
}
TFuture<TGenericResult> CreateReplication(const TString&, const NYql::TCreateReplicationSettings&) override {
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
index 901e6e3066c..87f6d092118 100644
--- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp
+++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
@@ -8,6 +8,7 @@
#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/common/parser.h>
#include <ydb/services/metadata/abstract/kqp_common.h>
+#include <ydb/services/lib/actors/pq_schema_actor.h>
namespace NKikimr::NKqp {
@@ -108,7 +109,7 @@ bool ConvertDataSlotToYdbTypedValue(NYql::EDataSlot fromType, const TString& fro
case NYql::EDataSlot::Interval64:
toType->set_type_id(Ydb::Type::INTERVAL64);
toValue->set_int64_value(FromString<i64>(fromValue));
- break;
+ break;
default:
return false;
}
@@ -907,16 +908,120 @@ public:
return dropPromise.GetFuture();
}
- TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request) override {
- FORWARD_ENSURE_NO_PREPARE(CreateTopic, cluster, std::move(request));
+ TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request, bool existingOk) override {
+ CHECK_PREPARED_DDL(CreateTopic);
+ Y_UNUSED(cluster);
+
+ std::pair<TString, TString> pathPair;
+ TString error;
+ auto createPromise = NewPromise<TGenericResult>();
+ if (!NSchemeHelpers::SplitTablePath(request.path(), GetDatabase(), pathPair, error, false)) {
+ return MakeFuture(ResultFromError<TGenericResult>(error));
+ }
+ NKikimrSchemeOp::TModifyScheme schemeTx;
+ schemeTx.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup);
+
+ schemeTx.SetWorkingDir(pathPair.first);
+
+ auto pqDescr = schemeTx.MutableCreatePersQueueGroup();
+ pqDescr->SetName(pathPair.second);
+ NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(pathPair.second, request, schemeTx, AppData(ActorSystem), error, pathPair.first);
+
+ if (IsPrepare()) {
+ auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
+ auto& phyTx = *phyQuery.AddTransactions();
+ phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
+
+
+ phyTx.MutableSchemeOperation()->MutableCreateTopic()->Swap(&schemeTx);
+ phyTx.MutableSchemeOperation()->MutableCreateTopic()->SetFailedOnAlreadyExists(!existingOk);
+ TGenericResult result;
+ result.SetSuccess();
+ createPromise.SetValue(result);
+ } else {
+ return Gateway->CreateTopic(cluster, std::move(request), existingOk);
+ }
+ return createPromise.GetFuture();
}
- TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request) override {
- FORWARD_ENSURE_NO_PREPARE(AlterTopic, cluster, std::move(request));
+ TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request, bool missingOk) override {
+ CHECK_PREPARED_DDL(AlterTopic);
+ Y_UNUSED(cluster);
+ std::pair<TString, TString> pathPair;
+ TString error;
+ if (!NSchemeHelpers::SplitTablePath(request.path(), GetDatabase(), pathPair, error, false)) {
+ return MakeFuture(ResultFromError<TGenericResult>(error));
+ }
+ auto alterPromise = NewPromise<TGenericResult>();
+
+ if (IsPrepare()) {
+ TAlterTopicSettings settings{std::move(request), pathPair.second, pathPair.first, missingOk};
+ auto getModifySchemeFuture = Gateway->AlterTopicPrepared(std::move(settings));
+
+
+ auto* phyQuery = SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
+
+ getModifySchemeFuture.Subscribe([=] (const auto future) mutable {
+ TGenericResult result;
+ auto modifySchemeResult = future.GetValue();
+ if (modifySchemeResult.Status == Ydb::StatusIds::SUCCESS) {
+ if (modifySchemeResult.ModifyScheme.HasAlterPersQueueGroup()) {
+ auto* phyTx = phyQuery->AddTransactions();
+ phyTx->SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
+ phyTx->MutableSchemeOperation()->MutableAlterTopic()->Swap(&modifySchemeResult.ModifyScheme);
+ phyTx->MutableSchemeOperation()->MutableAlterTopic()->SetSuccessOnNotExist(missingOk);
+ }
+ result.SetSuccess();
+
+ } else {
+ result.SetStatus(NYql::YqlStatusFromYdbStatus(modifySchemeResult.Status));
+ result.AddIssues(modifySchemeResult.Issues);
+ }
+ alterPromise.SetValue(result);
+ });
+
+ } else {
+ return Gateway->AlterTopic(cluster, std::move(request), missingOk);
+ }
+ return alterPromise.GetFuture();
+
}
- TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic) override {
- FORWARD_ENSURE_NO_PREPARE(DropTopic, cluster, topic);
+ NThreading::TFuture<NKikimr::NGRpcProxy::V1::TAlterTopicResponse> AlterTopicPrepared(TAlterTopicSettings&& settings) override {
+ return Gateway->AlterTopicPrepared(std::move(settings));
+ }
+
+ TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic, bool missingOk) override {
+ CHECK_PREPARED_DDL(DropTopic);
+ Y_UNUSED(cluster);
+
+ std::pair<TString, TString> pathPair;
+ TString error;
+ auto dropPromise = NewPromise<TGenericResult>();
+ if (!NSchemeHelpers::SplitTablePath(topic, GetDatabase(), pathPair, error, false)) {
+ return MakeFuture(ResultFromError<TGenericResult>(error));
+ }
+
+ if (IsPrepare()) {
+ auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
+ auto& phyTx = *phyQuery.AddTransactions();
+ phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
+
+ NKikimrSchemeOp::TModifyScheme schemeTx;
+ schemeTx.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropPersQueueGroup);
+
+ schemeTx.SetWorkingDir(pathPair.first);
+ schemeTx.MutableDrop()->SetName(pathPair.second);
+
+ phyTx.MutableSchemeOperation()->MutableDropTopic()->Swap(&schemeTx);
+ phyTx.MutableSchemeOperation()->MutableDropTopic()->SetSuccessOnNotExist(missingOk);
+ TGenericResult result;
+ result.SetSuccess();
+ dropPromise.SetValue(result);
+ } else {
+ return Gateway->DropTopic(cluster, topic, missingOk);
+ }
+ return dropPromise.GetFuture();
}
TFuture<TGenericResult> ModifyPermissions(const TString& cluster,
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
index c1675eeba43..0831fb24ee6 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
@@ -1168,7 +1168,8 @@ public:
YQL_ENSURE(settings.Mode);
auto mode = settings.Mode.Cast();
- if (mode == "create") {
+ if (mode == "create" || mode == "create_if_not_exists") {
+ bool existingOk = mode == "create_if_not_exists";
return Build<TKiCreateTopic>(ctx, node->Pos())
.World(node->Child(0))
.DataSink(node->Child(1))
@@ -1176,9 +1177,11 @@ public:
.TopicSettings(settings.TopicSettings.Cast())
.Consumers(settings.Consumers.Cast())
.Settings(settings.Other)
+ .ExistingOk<TCoAtom>().Value(existingOk).Build()
.Done()
.Ptr();
- } else if (mode == "alter") {
+ } else if (mode == "alter" || mode == "alter_if_exists") {
+ bool missingOk = mode == "alter_if_exists";
return Build<TKiAlterTopic>(ctx, node->Pos())
.World(node->Child(0))
.DataSink(node->Child(1))
@@ -1188,14 +1191,17 @@ public:
.AlterConsumers(settings.AlterConsumers.Cast())
.DropConsumers(settings.DropConsumers.Cast())
.Settings(settings.Other)
+ .MissingOk<TCoAtom>().Value(missingOk).Build()
.Done()
.Ptr();
- } else if (mode == "drop") {
+ } else if (mode == "drop" || mode == "drop_if_exists") {
+ bool missingOk = (mode == "drop_if_exists");
return Build<TKiDropTopic>(ctx, node->Pos())
.World(node->Child(0))
.DataSink(node->Child(1))
.Topic().Build(key.GetTopicPath())
.Settings(settings.Other)
+ .MissingOk<TCoAtom>().Value(missingOk).Build()
.Done()
.Ptr();
} else {
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index bfa76ad0d18..3f83b415947 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -1968,38 +1968,6 @@ public:
});
}
- if (auto maybeCreate = TMaybeNode<TKiCreateTopic>(input)) {
- auto requireStatus = RequireChild(*input, 0);
- if (requireStatus.Level != TStatus::Ok) {
- return SyncStatus(requireStatus);
- }
- auto cluster = TString(maybeCreate.Cast().DataSink().Cluster());
- TString topicName = TString(maybeCreate.Cast().Topic());
- Ydb::Topic::CreateTopicRequest createReq;
- createReq.set_path(topicName);
- for (const auto& consumer : maybeCreate.Cast().Consumers()) {
- auto error = AddConsumerToTopicRequest(createReq.add_consumers(), consumer);
- if (!error.empty()) {
- ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << error << input->Content()));
- return SyncError();
- }
- }
- AddTopicSettingsToRequest(&createReq,maybeCreate.Cast().TopicSettings());
- bool prepareOnly = SessionCtx->Query().PrepareOnly;
- // DEBUG
- // Cerr << "Create topic request proto: " << createReq.DebugString() << Endl;
- auto future = prepareOnly ? CreateDummySuccess() : (
- Gateway->CreateTopic(cluster, std::move(createReq))
- );
-
- return WrapFuture(future,
- [](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) {
- Y_UNUSED(res);
- auto resultNode = ctx.NewWorld(input->Pos());
- return resultNode;
- }, "Executing CREATE TOPIC");
- }
-
if (auto maybeCreateSequence = TMaybeNode<TKiCreateSequence>(input)) {
auto requireStatus = RequireChild(*input, 0);
if (requireStatus.Level != TStatus::Ok) {
@@ -2061,6 +2029,35 @@ public:
}, "Executing CREATE SEQUENCE");
}
+ if (auto maybeCreate = TMaybeNode<TKiCreateTopic>(input)) {
+ auto requireStatus = RequireChild(*input, 0);
+ if (requireStatus.Level != TStatus::Ok) {
+ return SyncStatus(requireStatus);
+ }
+ auto cluster = TString(maybeCreate.Cast().DataSink().Cluster());
+ TString topicName = TString(maybeCreate.Cast().Topic());
+ Ydb::Topic::CreateTopicRequest createReq;
+ createReq.set_path(topicName);
+ for (const auto& consumer : maybeCreate.Cast().Consumers()) {
+ auto error = AddConsumerToTopicRequest(createReq.add_consumers(), consumer);
+ if (!error.empty()) {
+ ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << error << input->Content()));
+ return SyncError();
+ }
+ }
+ AddTopicSettingsToRequest(&createReq,maybeCreate.Cast().TopicSettings());
+ bool existingOk = (maybeCreate.ExistingOk().Cast().Value() == "1");
+
+ auto future = Gateway->CreateTopic(cluster, std::move(createReq), existingOk);
+
+ return WrapFuture(future,
+ [](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) {
+ Y_UNUSED(res);
+ auto resultNode = ctx.NewWorld(input->Pos());
+ return resultNode;
+ }, "Executing CREATE TOPIC");
+ }
+
if (auto maybeAlter = TMaybeNode<TKiAlterTopic>(input)) {
auto requireStatus = RequireChild(*input, 0);
if (requireStatus.Level != TStatus::Ok) {
@@ -2088,13 +2085,9 @@ public:
auto name = consumer.Cast<TCoAtom>().StringValue();
alterReq.add_drop_consumers(name);
}
+ bool missingOk = (maybeAlter.MissingOk().Cast().Value() == "1");
AddAlterTopicSettingsToRequest(&alterReq, maybeAlter.Cast().TopicSettings());
- bool prepareOnly = SessionCtx->Query().PrepareOnly;
- // DEBUG
- // Cerr << "Alter topic request proto:\n" << alterReq.DebugString() << Endl;
- auto future = prepareOnly ? CreateDummySuccess() : (
- Gateway->AlterTopic(cluster, std::move(alterReq))
- );
+ auto future = Gateway->AlterTopic(cluster, std::move(alterReq), missingOk);
return WrapFuture(future,
[](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) {
@@ -2105,21 +2098,15 @@ public:
}
if (auto maybeDrop = TMaybeNode<TKiDropTopic>(input)) {
- if (!EnsureNotPrepare("DROP TOPIC", input->Pos(), SessionCtx->Query(), ctx)) {
- return SyncError();
- }
-
auto requireStatus = RequireChild(*input, 0);
if (requireStatus.Level != TStatus::Ok) {
return SyncStatus(requireStatus);
}
auto cluster = TString(maybeDrop.Cast().DataSink().Cluster());
TString topicName = TString(maybeDrop.Cast().Topic());
+ bool missingOk = (maybeDrop.MissingOk().Cast().Value() == "1");
- bool prepareOnly = SessionCtx->Query().PrepareOnly;
- auto future = prepareOnly ? CreateDummySuccess() : (
- Gateway->DropTopic(cluster, topicName)
- );
+ auto future = Gateway->DropTopic(cluster, topicName, missingOk);
return WrapFuture(future,
[](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) {
diff --git a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
index 2110c957525..20725891040 100644
--- a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
+++ b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
@@ -171,7 +171,8 @@
{"Index": 2, "Name": "Topic", "Type": "TCoAtom"},
{"Index": 3, "Name": "Consumers", "Type": "TCoTopicConsumerList"},
{"Index": 4, "Name": "TopicSettings", "Type": "TCoNameValueTupleList"},
- {"Index": 5, "Name": "Settings", "Type": "TCoNameValueTupleList"}
+ {"Index": 5, "Name": "Settings", "Type": "TCoNameValueTupleList"},
+ {"Index": 6, "Name": "ExistingOk", "Type": "TCoAtom"}
]
},
{
@@ -186,7 +187,9 @@
{"Index": 4, "Name": "AlterConsumers", "Type": "TCoTopicConsumerList"},
{"Index": 5, "Name": "DropConsumers", "Type": "TCoAtomList"},
{"Index": 6, "Name": "TopicSettings", "Type": "TCoNameValueTupleList"},
- {"Index": 7, "Name": "Settings", "Type": "TCoNameValueTupleList"}
+ {"Index": 7, "Name": "Settings", "Type": "TCoNameValueTupleList"},
+ {"Index": 8, "Name": "MissingOk", "Type": "TCoAtom"}
+
]
},
{
@@ -197,7 +200,9 @@
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"},
{"Index": 2, "Name": "Topic", "Type": "TCoAtom"},
- {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"}
+ {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"},
+ {"Index": 4, "Name": "MissingOk", "Type": "TCoAtom"}
+
]
},
{
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h
index 6e0a6a9f60a..33991c8f026 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway.h
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h
@@ -14,6 +14,7 @@
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/services/metadata/abstract/kqp_common.h>
#include <ydb/services/metadata/manager/abstract.h>
+#include <ydb/services/persqueue_v1/actors/events.h>
#include <ydb/core/external_sources/external_source_factory.h>
#include <ydb/core/kqp/query_data/kqp_query_data.h>
@@ -708,6 +709,13 @@ struct TCreateExternalTableSettings {
TVector<std::pair<TString, TString>> SourceTypeParameters;
};
+struct TAlterTopicSettings {
+ Ydb::Topic::AlterTopicRequest Request;
+ TString Name;
+ TString WorkDir;
+ bool MissingOk;
+};
+
struct TSequenceSettings {
TMaybe<i64> MinValue;
TMaybe<i64> MaxValue;
@@ -986,11 +994,13 @@ public:
virtual NThreading::TFuture<TGenericResult> DropTable(const TString& cluster, const TDropTableSettings& settings) = 0;
- virtual NThreading::TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request) = 0;
+ virtual NThreading::TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request, bool existingOk) = 0;
+
+ virtual NThreading::TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request, bool missingOk) = 0;
- virtual NThreading::TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request) = 0;
+ virtual NThreading::TFuture<NKikimr::NGRpcProxy::V1::TAlterTopicResponse> AlterTopicPrepared(TAlterTopicSettings&& settings) = 0;
- virtual NThreading::TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic) = 0;
+ virtual NThreading::TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic, bool missingOk) = 0;
virtual NThreading::TFuture<TGenericResult> CreateReplication(const TString& cluster, const TCreateReplicationSettings& settings) = 0;
diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
index 9a990514392..1c10f842f18 100644
--- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
@@ -3118,7 +3118,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
// Shuffled
auto client = kikimr.GetQueryClient();
- {
+ {
auto prepareResult = client.ExecuteQuery(R"(
REPLACE INTO `/Root/ColumnShard` (Col3, Col4, Col2, Col1) VALUES
("test100", "100", 1000, 100u);
@@ -3167,7 +3167,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
auto client = kikimr.GetQueryClient();
-
+
{
auto it = client.ExecuteQuery(R"(
REPLACE INTO `/Root/DataShard` (Col1, Col2) VALUES (0u, 0);
@@ -3185,7 +3185,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
CompareYson(output, R"([[0u;[0];#];[1u;#;["test"]]])");
}
- {
+ {
auto it = client.ExecuteQuery(R"(
REPLACE INTO `/Root/DataShard` (Col1, Col3) VALUES (0u, 'null');
REPLACE INTO `/Root/DataShard` (Col1) VALUES (1u);
@@ -3547,7 +3547,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
auto client = kikimr.GetQueryClient();
- {
+ {
auto prepareResult = client.ExecuteQuery(R"(
REPLACE INTO `/Root/ColumnShard` (Col1, Col2) VALUES
(100u, 1000), (100u, 1000);
@@ -3555,7 +3555,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
}
- {
+ {
auto prepareResult = client.ExecuteQuery(R"(
REPLACE INTO `/Root/ColumnShard` (Col1, Col2) VALUES
(100u, 1000), (100u, 1000);
@@ -3598,7 +3598,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
auto client = kikimr.GetQueryClient();
- {
+ {
auto prepareResult = client.ExecuteQuery(R"(
REPLACE INTO `/Root/ColumnShard` (Col1, Col2) VALUES (1u, 1)
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
@@ -4020,6 +4020,256 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
);
}
}
+
+
+ void RunQuery (const TString& query, auto& session, bool expectOk = true) {
+ auto qResult = session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
+ if (!qResult.IsSuccess()) {
+ Cerr << "Query failed, status: " << qResult.GetStatus() << ": " << qResult.GetIssues().ToString() << Endl;
+ }
+ UNIT_ASSERT(qResult.IsSuccess() == expectOk);
+ };
+
+ struct TEntryCheck {
+ NYdb::NScheme::ESchemeEntryType Type;
+ TString Name;
+ bool IsExpected;
+ bool WasFound = false;
+ };
+
+ TEntryCheck ExpectedTopic(const TString& name) {
+ return TEntryCheck{NYdb::NScheme::ESchemeEntryType::Topic, name, true};
+ }
+ TEntryCheck UnexpectedTopic(const TString& name) {
+ return TEntryCheck{NYdb::NScheme::ESchemeEntryType::Topic, name, false};
+ }
+
+ void CheckDirEntry(TKikimrRunner& kikimr, TVector<TEntryCheck>& entriesToCheck) {
+ auto res = kikimr.GetSchemeClient().ListDirectory("/Root").GetValueSync();
+ for (const auto& entry : res.GetChildren()) {
+ Cerr << "Scheme entry: " << entry << Endl;
+ for (auto& checkEntry : entriesToCheck) {
+ if (checkEntry.Name != entry.Name)
+ continue;
+ if (checkEntry.IsExpected) {
+ UNIT_ASSERT_C(entry.Type == checkEntry.Type, checkEntry.Name);
+ checkEntry.WasFound = true;
+ } else {
+ UNIT_ASSERT_C(entry.Type != checkEntry.Type, checkEntry.Name);
+ }
+ }
+ }
+ for (auto& checkEntry : entriesToCheck) {
+ if (checkEntry.IsExpected) {
+ UNIT_ASSERT_C(checkEntry.WasFound, checkEntry.Name);
+ }
+ }
+ }
+
+ Y_UNIT_TEST(CreateAndDropTopic) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
+ auto setting = NKikimrKqp::TKqpSetting();
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetKqpSettings({setting});
+ serverSettings.PQConfig.SetRequireCredentialsInNewProtocol(false);
+ TKikimrRunner kikimr(
+ serverSettings.SetWithSampleTables(false).SetEnableTempTables(true));
+ auto client = kikimr.GetQueryClient();
+ auto session = client.GetSession().GetValueSync().GetSession();
+ auto pq = NYdb::NTopic::TTopicClient(kikimr.GetDriver(),
+ NYdb::NTopic::TTopicClientSettings().Database("/Root").AuthToken("root@builtin"));
+
+ {
+ const auto queryCreateTopic = Q_(R"(
+ --!syntax_v1
+ CREATE TOPIC `/Root/TempTopic` (CONSUMER cons1);
+ )");
+ RunQuery(queryCreateTopic, session);
+ Cerr << "Topic created\n";
+ auto desc = pq.DescribeTopic("/Root/TempTopic").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetConsumers().size(), 1);
+ }
+ {
+ const auto queryCreateTopic = Q_(R"(
+ --!syntax_v1
+ CREATE TOPIC IF NOT EXISTS `/Root/TempTopic` (CONSUMER cons1, CONSUMER cons2);
+ )");
+ RunQuery(queryCreateTopic, session);
+ auto desc = pq.DescribeTopic("/Root/TempTopic").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetConsumers().size(), 1);
+ }
+ {
+ const auto queryCreateTopic = Q_(R"(
+ --!syntax_v1
+ CREATE TOPIC `/Root/TempTopic` (CONSUMER cons1, CONSUMER cons2, CONSUMER cons3);
+ )");
+ RunQuery(queryCreateTopic, session, false);
+ auto desc = pq.DescribeTopic("/Root/TempTopic").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetConsumers().size(), 1);
+ }
+
+ TVector<TEntryCheck> entriesToCheck = {ExpectedTopic("TempTopic")};
+ CheckDirEntry(kikimr, entriesToCheck);
+ {
+ const auto query = Q_(R"(
+ --!syntax_v1
+ Drop TOPIC `/Root/TempTopic`;
+ )");
+ RunQuery(query, session);
+ Cerr << "Topic dropped\n";
+ TVector<TEntryCheck> entriesToCheck = {UnexpectedTopic("TempTopic")};
+ CheckDirEntry(kikimr, entriesToCheck);
+ }
+ {
+ const auto query = Q_(R"(
+ --!syntax_v1
+ Drop TOPIC IF EXISTS `/Root/TempTopic`;
+ )");
+ RunQuery(query, session);
+ }
+ {
+ const auto query = Q_(R"(
+ --!syntax_v1
+ Drop TOPIC `/Root/TempTopic`;
+ )");
+ RunQuery(query, session, false);
+ }
+ }
+
+ Y_UNIT_TEST(CreateAndAlterTopic) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
+ auto setting = NKikimrKqp::TKqpSetting();
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetKqpSettings({setting});
+ TKikimrRunner kikimr{serverSettings};
+ auto client = kikimr.GetQueryClient(NYdb::NQuery::TClientSettings{}.AuthToken("root@builtin"));
+ auto session = client.GetSession().GetValueSync().GetSession();
+ auto pq = NYdb::NTopic::TTopicClient(kikimr.GetDriver(),
+ NYdb::NTopic::TTopicClientSettings().Database("/Root").AuthToken("root@builtin"));
+
+ {
+ const auto queryCreateTopic = Q_(R"(
+ --!syntax_v1
+ CREATE TOPIC `/Root/TempTopic` (CONSUMER cons1);
+ )");
+ RunQuery(queryCreateTopic, session);
+
+ auto desc = pq.DescribeTopic("/Root/TempTopic").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 1);
+ }
+ {
+ const auto query = Q_(R"(
+ --!syntax_v1
+ ALTER TOPIC `/Root/TempTopic` SET (min_active_partitions = 10);
+ )");
+ RunQuery(query, session);
+ auto desc = pq.DescribeTopic("/Root/TempTopic").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 10);
+ }
+ {
+ const auto query = Q_(R"(
+ --!syntax_v1
+ ALTER TOPIC IF EXISTS `/Root/TempTopic` SET (min_active_partitions = 15);
+ )");
+ RunQuery(query, session);
+ auto desc = pq.DescribeTopic("/Root/TempTopic").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 15);
+ }
+
+ {
+ const auto query = Q_(R"(
+ --!syntax_v1
+ ALTER TOPIC `/Root/NoSuchTopic` SET (min_active_partitions = 10);
+ )");
+ RunQuery(query, session, false);
+
+ TVector<TEntryCheck> entriesToCheck = {UnexpectedTopic("NoSuchTopic")};
+ CheckDirEntry(kikimr, entriesToCheck);
+ }
+ {
+ const auto query = Q_(R"(
+ --!syntax_v1
+ ALTER TOPIC IF EXISTS `/Root/NoSuchTopic` SET (min_active_partitions = 10);
+ )");
+ RunQuery(query, session);
+ TVector<TEntryCheck> entriesToCheck = {UnexpectedTopic("NoSuchTopic")};
+ CheckDirEntry(kikimr, entriesToCheck);
+ }
+ }
+ Y_UNIT_TEST(CreateOrDropTopicOverTable) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
+ auto setting = NKikimrKqp::TKqpSetting();
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetKqpSettings({setting});
+ TKikimrRunner kikimr{serverSettings};
+ auto tableClient = kikimr.GetTableClient();
+
+ {
+ auto tcSession = tableClient.CreateSession().GetValueSync().GetSession();
+ UNIT_ASSERT(tcSession.ExecuteSchemeQuery(R"(
+ CREATE TABLE `/Root/TmpTable` (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )").GetValueSync().IsSuccess());
+ tcSession.Close();
+ }
+
+ auto client = kikimr.GetQueryClient(NYdb::NQuery::TClientSettings{}.AuthToken("root@builtin"));
+ auto session = client.GetSession().GetValueSync().GetSession();
+
+ TVector<TEntryCheck> entriesToCheck = {TEntryCheck{.Type = NYdb::NScheme::ESchemeEntryType::Table,
+ .Name = "TmpTable", .IsExpected = true}};
+ {
+ const auto queryCreateTopic = Q_(R"(
+ --!syntax_v1
+ CREATE TOPIC `/Root/TmpTable` (CONSUMER cons1);
+ )");
+ RunQuery(queryCreateTopic, session, false);
+ CheckDirEntry(kikimr, entriesToCheck);
+
+ }
+ {
+ const auto queryCreateTopic = Q_(R"(
+ --!syntax_v1
+ CREATE TOPIC IF NOT EXISTS `/Root/TmpTable` (CONSUMER cons1);
+ )");
+ RunQuery(queryCreateTopic, session, false);
+ CheckDirEntry(kikimr, entriesToCheck);
+ }
+ {
+ const auto queryDropTopic = Q_(R"(
+ --!syntax_v1
+ DROP TOPIC `/Root/TmpTable`;
+ )");
+ RunQuery(queryDropTopic, session, false);
+ }
+ {
+ const auto queryDropTopic = Q_(R"(
+ --!syntax_v1
+ DROP TOPIC IF EXISTS `/Root/TmpTable`;
+ )");
+ RunQuery(queryDropTopic, session, false);
+ CheckDirEntry(kikimr, entriesToCheck);
+ }
+ {
+ auto tcSession = tableClient.CreateSession().GetValueSync().GetSession();
+ auto type = TTypeBuilder().BeginOptional().Primitive(EPrimitiveType::Uint64).EndOptional().Build();
+ auto alter = NYdb::NTable::TAlterTableSettings().AppendAddColumns(TColumn("NewColumn", type));
+
+ auto alterResult = tcSession.AlterTable("/Root/TmpTable", alter
+ ).GetValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL(alterResult.GetStatus(), EStatus::SUCCESS);
+ }
+ }
}
} // namespace NKqp
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index d2bbb91c5cc..70d0391e283 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -457,6 +457,9 @@ message TKqpSchemeOperation {
NKikimrSchemeOp.TModifyScheme AlterResourcePool = 39;
NKikimrSchemeOp.TModifyScheme DropResourcePool = 40;
TKqpAnalyzeOperation AnalyzeTable = 41;
+ NKikimrSchemeOp.TModifyScheme CreateTopic = 42;
+ NKikimrSchemeOp.TModifyScheme AlterTopic = 43;
+ NKikimrSchemeOp.TModifyScheme DropTopic = 44;
}
}
diff --git a/ydb/core/tablet/tablet_counters_aggregator.cpp b/ydb/core/tablet/tablet_counters_aggregator.cpp
index e66563d4872..28e7497d252 100644
--- a/ydb/core/tablet/tablet_counters_aggregator.cpp
+++ b/ydb/core/tablet/tablet_counters_aggregator.cpp
@@ -159,7 +159,7 @@ public:
}
for (ui32 i = 0, e = labeledCounters->GetCounters().Size(); i < e; ++i) {
- if(!strlen(labeledCounters->GetCounterName(i)))
+ if(!strlen(labeledCounters->GetCounterName(i)))
continue;
const ui64& value = labeledCounters->GetCounters()[i].Get();
const ui64& id = labeledCounters->GetIds()[i].Get();
@@ -1501,7 +1501,8 @@ TTabletCountersAggregatorActor::HandleWork(TEvTabletCounters::TEvTabletLabeledCo
continue;
}
if (groupNames[j] == "Client") {
- group = group->GetSubgroup("ConsumerPath", NPersQueue::ConvertOldConsumerName(groups[j], ctx));
+ group = group->GetSubgroup("ConsumerPath",
+ NPersQueue::ConvertOldConsumerName(groups[j], AppData(ctx)->PQConfig));
continue;
}
}
@@ -2095,7 +2096,7 @@ public:
if (groups.size() == 1) { //topic case
ff = groups[0];
} else if (groups.size() == 3) { //client important topic
- res = NPersQueue::ConvertOldConsumerName(groups[0], ctx) + "|" + groups[1] + "|";
+ res = NPersQueue::ConvertOldConsumerName(groups[0], AppData(ctx)->PQConfig) + "|" + groups[1] + "|";
ff = groups[2];
} else {
continue;
diff --git a/ydb/library/persqueue/topic_parser/topic_parser.cpp b/ydb/library/persqueue/topic_parser/topic_parser.cpp
index dec46a7d803..c71c9ac6084 100644
--- a/ydb/library/persqueue/topic_parser/topic_parser.cpp
+++ b/ydb/library/persqueue/topic_parser/topic_parser.cpp
@@ -75,22 +75,31 @@ TString GetFullTopicPath(const NActors::TActorContext& ctx, const TMaybe<TString
}
}
-TString ConvertNewConsumerName(const TString& consumer, const NActors::TActorContext& ctx) {
- if (NKikimr::AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
+TString ConvertNewConsumerName(const TString& consumer, const NKikimrPQ::TPQConfig& pqConfig) {
+ if (pqConfig.GetTopicsAreFirstClassCitizen()) {
return consumer;
} else {
return ConvertNewConsumerName(consumer);
}
}
-TString ConvertOldConsumerName(const TString& consumer, const NActors::TActorContext& ctx) {
- if (NKikimr::AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
+TString ConvertNewConsumerName(const TString& consumer, const NActors::TActorContext& ctx) {
+ return ConvertNewConsumerName(consumer, NKikimr::AppData(ctx)->PQConfig);
+}
+
+TString ConvertOldConsumerName(const TString& consumer, const NKikimrPQ::TPQConfig& pqConfig) {
+ if (pqConfig.GetTopicsAreFirstClassCitizen()) {
return consumer;
} else {
return ConvertOldConsumerName(consumer);
}
}
+TString ConvertOldConsumerName(const TString& consumer, const NActors::TActorContext& ctx) {
+ return ConvertOldConsumerName(consumer, NKikimr::AppData(ctx)->PQConfig);
+}
+
+
TString MakeConsumerPath(const TString& consumer) {
TStringBuilder res;
res.reserve(consumer.size());
diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h
index 95d48657293..9ca5809460b 100644
--- a/ydb/library/persqueue/topic_parser/topic_parser.h
+++ b/ydb/library/persqueue/topic_parser/topic_parser.h
@@ -18,7 +18,9 @@ namespace NKikimr::NMsgBusProxy::NPqMetaCacheV2 {
namespace NPersQueue {
TString GetFullTopicPath(const NActors::TActorContext& ctx, const TMaybe<TString>& database, const TString& topicPath);
+TString ConvertNewConsumerName(const TString& consumer, const NKikimrPQ::TPQConfig& pqConfig);
TString ConvertNewConsumerName(const TString& consumer, const NActors::TActorContext& ctx);
+TString ConvertOldConsumerName(const TString& consumer, const NKikimrPQ::TPQConfig& pqConfig);
TString ConvertOldConsumerName(const TString& consumer, const NActors::TActorContext& ctx);
TString MakeConsumerPath(const TString& consumer);
diff --git a/ydb/library/yql/sql/v1/SQLv1.g.in b/ydb/library/yql/sql/v1/SQLv1.g.in
index 838d5608013..3af06e1158c 100644
--- a/ydb/library/yql/sql/v1/SQLv1.g.in
+++ b/ydb/library/yql/sql/v1/SQLv1.g.in
@@ -915,7 +915,7 @@ multiple_column_assignment: set_target_list EQUALS LPAREN simple_values_source R
set_target_list: LPAREN set_target (COMMA set_target)* RPAREN;
// topics
-create_topic_stmt: CREATE TOPIC topic_ref create_topic_entries? with_topic_settings?;
+create_topic_stmt: CREATE TOPIC (IF NOT EXISTS)? topic_ref create_topic_entries? with_topic_settings?;
create_topic_entries: LPAREN create_topic_entry (COMMA create_topic_entry)* RPAREN;
create_topic_entry:
@@ -923,7 +923,7 @@ create_topic_entry:
;
with_topic_settings: WITH LPAREN topic_settings RPAREN;
-alter_topic_stmt: ALTER TOPIC topic_ref alter_topic_action (COMMA alter_topic_action)*;
+alter_topic_stmt: ALTER TOPIC (IF EXISTS)? topic_ref alter_topic_action (COMMA alter_topic_action)*;
alter_topic_action:
alter_topic_add_consumer
| alter_topic_alter_consumer
@@ -949,7 +949,7 @@ topic_alter_consumer_reset: RESET LPAREN an_id (COMMA an_id)* RPAREN;
alter_topic_set_settings: SET LPAREN topic_settings RPAREN;
alter_topic_reset_settings: RESET LPAREN an_id (COMMA an_id_pure)* RPAREN;
-drop_topic_stmt: DROP TOPIC topic_ref;
+drop_topic_stmt: DROP TOPIC (IF EXISTS)? topic_ref;
topic_settings: topic_settings_entry (COMMA topic_settings_entry)*;
topic_settings_entry: an_id EQUALS topic_setting_value;
diff --git a/ydb/library/yql/sql/v1/format/sql_format.cpp b/ydb/library/yql/sql/v1/format/sql_format.cpp
index 8ebff4d1df8..88a35bd7bd9 100644
--- a/ydb/library/yql/sql/v1/format/sql_format.cpp
+++ b/ydb/library/yql/sql/v1/format/sql_format.cpp
@@ -1385,10 +1385,11 @@ private:
NewLine();
VisitKeyword(msg.GetToken1());
VisitKeyword(msg.GetToken2());
- Visit(msg.GetRule_topic_ref3());
- if (msg.HasBlock4()) {
+ Visit(msg.GetBlock3());
+ Visit(msg.GetRule_topic_ref4());
+ if (msg.HasBlock5()) {
PushCurrentIndent();
- auto& b = msg.GetBlock4().GetRule_create_topic_entries1();
+ auto& b = msg.GetBlock5().GetRule_create_topic_entries1();
Visit(b.GetToken1());
NewLine();
Visit(b.GetRule_create_topic_entry2());
@@ -1401,8 +1402,8 @@ private:
PopCurrentIndent();
Visit(b.GetToken4());
}
- if (msg.HasBlock5()) {
- auto& b = msg.GetBlock5().GetRule_with_topic_settings1();
+ if (msg.HasBlock6()) {
+ auto& b = msg.GetBlock6().GetRule_with_topic_settings1();
VisitKeyword(b.GetToken1());
VisitKeyword(b.GetToken2());
PushCurrentIndent();
@@ -1419,11 +1420,12 @@ private:
NewLine();
VisitKeyword(msg.GetToken1());
VisitKeyword(msg.GetToken2());
- Visit(msg.GetRule_topic_ref3());
+ Visit(msg.GetBlock3());
+ Visit(msg.GetRule_topic_ref4());
NewLine();
PushCurrentIndent();
- Visit(msg.GetRule_alter_topic_action4());
- for (auto& b : msg.GetBlock5()) {
+ Visit(msg.GetRule_alter_topic_action5());
+ for (auto& b : msg.GetBlock6()) {
Visit(b.GetToken1());
NewLine();
Visit(b.GetRule_alter_topic_action2());
diff --git a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
index 3e386b5d2dd..fedbf0dd625 100644
--- a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
+++ b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
@@ -284,7 +284,7 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) {
{"create table user(index user local on (user) cover (user,user))",
"CREATE TABLE user (\n\tINDEX user LOCAL ON (user) COVER (user, user)\n);\n"},
{"create table user(index idx global using subtype on (col) cover (col) with (setting = foo, another_setting = bar));",
- "CREATE TABLE user (\n\tINDEX idx GLOBAL USING subtype ON (col) COVER (col) WITH (setting = foo, another_setting = bar)\n);\n"},
+ "CREATE TABLE user (\n\tINDEX idx GLOBAL USING subtype ON (col) COVER (col) WITH (setting = foo, another_setting = bar)\n);\n"},
{"create table user(family user (user='foo'))",
"CREATE TABLE user (\n\tFAMILY user (user = 'foo')\n);\n"},
{"create table user(family user (user='foo',user='bar'))",
@@ -465,7 +465,7 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) {
{"alter table user alter index idx reset (setting, another_setting)",
"ALTER TABLE user\n\tALTER INDEX idx RESET (setting, another_setting);\n"},
{"alter table user add index idx global using subtype on (col) cover (col) with (setting = foo, another_setting = 'bar');",
- "ALTER TABLE user\n\tADD INDEX idx GLOBAL USING subtype ON (col) COVER (col) WITH (setting = foo, another_setting = 'bar');\n"},
+ "ALTER TABLE user\n\tADD INDEX idx GLOBAL USING subtype ON (col) COVER (col) WITH (setting = foo, another_setting = 'bar');\n"},
{"alter table user drop index user",
"ALTER TABLE user\n\tDROP INDEX user;\n"},
{"alter table user rename to user",
@@ -513,6 +513,7 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) {
TSetup setup;
setup.Run(cases);
}
+
Y_UNIT_TEST(AlterTopic) {
TCases cases = {
{"alter topic topic1 alter consumer c1 set (important = false)",
@@ -529,6 +530,7 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) {
TSetup setup;
setup.Run(cases);
}
+
Y_UNIT_TEST(DropTopic) {
TCases cases = {
{"drop topic topic1",
@@ -539,6 +541,20 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) {
setup.Run(cases);
}
+ Y_UNIT_TEST(TopicExistsStatement) {
+ TCases cases = {
+ {"drop topic if exists topic1",
+ "DROP TOPIC IF EXISTS topic1;\n"},
+ {"create topic if not exists topic1 with (partition_count_limit = 5)",
+ "CREATE TOPIC IF NOT EXISTS topic1 WITH (\n\tpartition_count_limit = 5\n);\n"},
+ {"alter topic if exists topic1 alter consumer c1 set (important = false)",
+ "ALTER TOPIC IF EXISTS topic1\n\tALTER CONSUMER c1 SET (important = FALSE);\n"},
+ };
+
+ TSetup setup;
+ setup.Run(cases);
+ }
+
Y_UNIT_TEST(Do) {
TCases cases = {
{"do $a(1,2,3)",
diff --git a/ydb/library/yql/sql/v1/node.h b/ydb/library/yql/sql/v1/node.h
index 0e9ca3f7b90..b5c3272f53d 100644
--- a/ydb/library/yql/sql/v1/node.h
+++ b/ydb/library/yql/sql/v1/node.h
@@ -1334,6 +1334,7 @@ namespace NSQLTranslationV1 {
struct TCreateTopicParameters {
TVector<TTopicConsumerDescription> Consumers;
TTopicSettings TopicSettings;
+ bool ExistingOk;
};
struct TAlterTopicParameters {
@@ -1341,6 +1342,11 @@ namespace NSQLTranslationV1 {
THashMap<TString, TTopicConsumerDescription> AlterConsumers;
TVector<TIdentifier> DropConsumers;
TTopicSettings TopicSettings;
+ bool MissingOk;
+ };
+
+ struct TDropTopicParameters {
+ bool MissingOk;
};
TString IdContent(TContext& ctx, const TString& str);
@@ -1475,7 +1481,8 @@ namespace NSQLTranslationV1 {
TScopedStatePtr scoped);
TNodePtr BuildAlterTopic(TPosition pos, const TTopicRef& tr, const TAlterTopicParameters& params,
TScopedStatePtr scoped);
- TNodePtr BuildDropTopic(TPosition pos, const TTopicRef& topic, TScopedStatePtr scoped);
+ TNodePtr BuildDropTopic(TPosition pos, const TTopicRef& topic, const TDropTopicParameters& params,
+ TScopedStatePtr scoped);
template<class TContainer>
TMaybe<TString> FindMistypeIn(const TContainer& container, const TString& name) {
diff --git a/ydb/library/yql/sql/v1/query.cpp b/ydb/library/yql/sql/v1/query.cpp
index cdd7ba0ef74..00e42bacde0 100644
--- a/ydb/library/yql/sql/v1/query.cpp
+++ b/ydb/library/yql/sql/v1/query.cpp
@@ -1630,7 +1630,8 @@ public:
}
auto opts = Y();
- opts = L(opts, Q(Y(Q("mode"), Q("create"))));
+ TString mode = Params.ExistingOk ? "create_if_not_exists" : "create";
+ opts = L(opts, Q(Y(Q("mode"), Q(mode))));
for (const auto& consumer : Params.Consumers) {
const auto& desc = CreateConsumerDesc(consumer, *this, false);
@@ -1741,7 +1742,8 @@ public:
}
auto opts = Y();
- opts = L(opts, Q(Y(Q("mode"), Q("alter"))));
+ TString mode = Params.MissingOk ? "alter_if_exists" : "alter";
+ opts = L(opts, Q(Y(Q("mode"), Q(mode))));
for (const auto& consumer : Params.AddConsumers) {
const auto& desc = CreateConsumerDesc(consumer, *this, false);
@@ -1815,9 +1817,10 @@ TNodePtr BuildAlterTopic(
class TDropTopicNode final: public TAstListNode {
public:
- TDropTopicNode(TPosition pos, const TTopicRef& tr, TScopedStatePtr scoped)
+ TDropTopicNode(TPosition pos, const TTopicRef& tr, const TDropTopicParameters& params, TScopedStatePtr scoped)
: TAstListNode(pos)
, Topic(tr)
+ , Params(params)
, Scoped(scoped)
{
scoped->UseCluster(TString(KikimrProviderName), Topic.Cluster);
@@ -1832,7 +1835,8 @@ public:
auto opts = Y();
- opts = L(opts, Q(Y(Q("mode"), Q("drop"))));
+ TString mode = Params.MissingOk ? "drop_if_exists" : "drop";
+ opts = L(opts, Q(Y(Q("mode"), Q(mode))));
Add("block", Q(Y(
@@ -1850,12 +1854,13 @@ public:
}
private:
TTopicRef Topic;
+ TDropTopicParameters Params;
TScopedStatePtr Scoped;
TSourcePtr FakeSource;
};
-TNodePtr BuildDropTopic(TPosition pos, const TTopicRef& tr, TScopedStatePtr scoped) {
- return new TDropTopicNode(pos, tr, scoped);
+TNodePtr BuildDropTopic(TPosition pos, const TTopicRef& tr, const TDropTopicParameters& params, TScopedStatePtr scoped) {
+ return new TDropTopicNode(pos, tr, params, scoped);
}
class TCreateRole final: public TAstListNode {
diff --git a/ydb/library/yql/sql/v1/sql_query.cpp b/ydb/library/yql/sql/v1/sql_query.cpp
index d4e2c1bed86..066672b7c5c 100644
--- a/ydb/library/yql/sql/v1/sql_query.cpp
+++ b/ydb/library/yql/sql/v1/sql_query.cpp
@@ -1002,16 +1002,21 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
}
case TRule_sql_stmt_core::kAltSqlStmtCore35: {
Ctx.BodyPart();
- // create_topic_stmt: CREATE TOPIC topic1 (CONSUMER ...)? [WITH (opt1 = val1, ...]?
+ // create_topic_stmt: CREATE TOPIC (IF NOT EXISTS)? topic1 (CONSUMER ...)? [WITH (opt1 = val1, ...]?
auto& rule = core.GetAlt_sql_stmt_core35().GetRule_create_topic_stmt1();
TTopicRef tr;
- if (!TopicRefImpl(rule.GetRule_topic_ref3(), tr)) {
+ if (!TopicRefImpl(rule.GetRule_topic_ref4(), tr)) {
return false;
}
+ bool existingOk = false;
+ if (rule.HasBlock3()) { // if not exists
+ existingOk = true;
+ }
TCreateTopicParameters params;
- if (rule.HasBlock4()) { //create_topic_entry (consumers)
- auto& entries = rule.GetBlock4().GetRule_create_topic_entries1();
+ params.ExistingOk = existingOk;
+ if (rule.HasBlock5()) { //create_topic_entry (consumers)
+ auto& entries = rule.GetBlock5().GetRule_create_topic_entries1();
auto& firstEntry = entries.GetRule_create_topic_entry2();
if (!CreateTopicEntry(firstEntry, params)) {
return false;
@@ -1024,8 +1029,8 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
}
}
- if (rule.HasBlock5()) { // with_topic_settings
- auto& topic_settings_node = rule.GetBlock5().GetRule_with_topic_settings1().GetRule_topic_settings3();
+ if (rule.HasBlock6()) { // with_topic_settings
+ auto& topic_settings_node = rule.GetBlock6().GetRule_with_topic_settings1().GetRule_topic_settings3();
CreateTopicSettings(topic_settings_node, params.TopicSettings);
}
@@ -1034,20 +1039,26 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
}
case TRule_sql_stmt_core::kAltSqlStmtCore36: {
// alter_topic_stmt: ALTER TOPIC topic_ref alter_topic_action (COMMA alter_topic_action)*;
+// alter_topic_stmt: ALTER TOPIC IF EXISTS topic_ref alter_topic_action (COMMA alter_topic_action)*;
Ctx.BodyPart();
auto& rule = core.GetAlt_sql_stmt_core36().GetRule_alter_topic_stmt1();
TTopicRef tr;
- if (!TopicRefImpl(rule.GetRule_topic_ref3(), tr)) {
+ bool missingOk = false;
+ if (rule.HasBlock3()) { // IF EXISTS
+ missingOk = true;
+ }
+ if (!TopicRefImpl(rule.GetRule_topic_ref4(), tr)) {
return false;
}
TAlterTopicParameters params;
- auto& firstEntry = rule.GetRule_alter_topic_action4();
+ params.MissingOk = missingOk;
+ auto& firstEntry = rule.GetRule_alter_topic_action5();
if (!AlterTopicAction(firstEntry, params)) {
return false;
}
- const auto& list = rule.GetBlock5();
+ const auto& list = rule.GetBlock6();
for (auto& node : list) {
if (!AlterTopicAction(node.GetRule_alter_topic_action2(), params)) {
return false;
@@ -1058,15 +1069,22 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore37: {
- // drop_topic_stmt: DROP TOPIC
+ // drop_topic_stmt: DROP TOPIC (IF EXISTS)? topic_ref;
Ctx.BodyPart();
const auto& rule = core.GetAlt_sql_stmt_core37().GetRule_drop_topic_stmt1();
+ TDropTopicParameters params;
+ if (rule.HasBlock3()) { // IF EXISTS
+ params.MissingOk = true;
+ } else {
+ params.MissingOk = false;
+ }
+
TTopicRef tr;
- if (!TopicRefImpl(rule.GetRule_topic_ref3(), tr)) {
+ if (!TopicRefImpl(rule.GetRule_topic_ref4(), tr)) {
return false;
}
- AddStatementToBlocks(blocks, BuildDropTopic(Ctx.Pos(), tr, Ctx.Scoped));
+ AddStatementToBlocks(blocks, BuildDropTopic(Ctx.Pos(), tr, params, Ctx.Scoped));
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore38: {
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index 69e1a813b8a..e1fd2934f7e 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -170,20 +170,20 @@ namespace NKikimr::NDataStreams::V1 {
break;
default:
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
- "streams can't be created with unknown metering mode", ctx);
+ "streams can't be created with unknown metering mode");
}
}
} else {
if (GetProtoRequest()->has_stream_mode_details()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
- "streams can't be created with metering mode", ctx);
+ "streams can't be created with metering mode");
}
}
if (GetProtoRequest()->has_partitioning_settings()) {
auto r = ValidatePartitioningSettings(GetProtoRequest()->partitioning_settings());
if (!r.empty()) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), r, ctx);
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), r);
}
auto& s = GetProtoRequest()->partitioning_settings();
@@ -214,10 +214,10 @@ namespace NKikimr::NDataStreams::V1 {
pqDescr->SetPartitionPerTablet(1);
TString error;
- TYdbPqCodes codes = NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(name, topicRequest, modifyScheme, ctx, error,
+ TYdbPqCodes codes = NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(name, topicRequest, modifyScheme, AppData(ctx), error,
workingDir, proposal.Record.GetDatabaseName());
if (codes.YdbCode != Ydb::StatusIds::SUCCESS) {
- return ReplyWithError(codes.YdbCode, codes.PQCode, error, ctx);
+ return ReplyWithError(codes.YdbCode, codes.PQCode, error);
}
}
@@ -229,8 +229,7 @@ namespace NKikimr::NDataStreams::V1 {
{
return ReplyWithError(Ydb::StatusIds::ALREADY_EXISTS,
static_cast<size_t>(NYds::EErrorCodes::IN_USE),
- TStringBuilder() << "Stream with name " << GetProtoRequest()->stream_name() << " already exists",
- ctx);
+ TStringBuilder() << "Stream with name " << GetProtoRequest()->stream_name() << " already exists");
}
return TBase::TBase::Handle(ev, ctx);
}
@@ -297,7 +296,7 @@ namespace NKikimr::NDataStreams::V1 {
if (NPQ::ConsumerCount(pqGroupDescription.GetPQTabletConfig()) > 0 && EnforceDeletion == false) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::IN_USE),
TStringBuilder() << "Stream has registered consumers" <<
- "and EnforceConsumerDeletion flag is false", ActorContext());
+ "and EnforceConsumerDeletion flag is false");
}
SendProposeRequest(ActorContext());
@@ -314,10 +313,10 @@ namespace NKikimr::NDataStreams::V1 {
}
void Bootstrap(const TActorContext& ctx);
- void ModifyPersqueueConfig(const TActorContext& ctx,
+ void ModifyPersqueueConfig(TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
- const NKikimrSchemeOp::TDirEntry& selfInfo);
+ const NKikimrSchemeOp::TDirEntry& selfInfo) override;
};
void TUpdateShardCountActor::Bootstrap(const TActorContext& ctx) {
@@ -327,16 +326,17 @@ namespace NKikimr::NDataStreams::V1 {
}
void TUpdateShardCountActor::ModifyPersqueueConfig(
- const TActorContext& ctx,
+ TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
) {
Y_UNUSED(selfInfo);
+ Y_UNUSED(appData);
TString error;
if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), error, ctx);
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), error);
}
groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count());
@@ -354,10 +354,10 @@ namespace NKikimr::NDataStreams::V1 {
}
void Bootstrap(const TActorContext& ctx);
- void ModifyPersqueueConfig(const TActorContext& ctx,
+ void ModifyPersqueueConfig(TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
- const NKikimrSchemeOp::TDirEntry& selfInfo);
+ const NKikimrSchemeOp::TDirEntry& selfInfo) override;
};
void TUpdateStreamModeActor::Bootstrap(const TActorContext& ctx) {
@@ -367,16 +367,16 @@ namespace NKikimr::NDataStreams::V1 {
}
void TUpdateStreamModeActor::ModifyPersqueueConfig(
- const TActorContext& ctx,
+ TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
) {
Y_UNUSED(selfInfo);
Y_UNUSED(pqGroupDescription);
- if (!AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) {
+ if (!appData->PQConfig.GetBillingMeteringConfig().GetEnabled()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
- "streams can't be created with metering mode", ctx);
+ "streams can't be created with metering mode");
}
switch(GetProtoRequest()->stream_mode_details().stream_mode()) {
@@ -388,7 +388,7 @@ namespace NKikimr::NDataStreams::V1 {
break;
default:
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
- "streams can't be created with unknown metering mode", ctx);
+ "streams can't be created with unknown metering mode");
}
}
@@ -406,7 +406,7 @@ namespace NKikimr::NDataStreams::V1 {
}
void Bootstrap(const TActorContext& ctx);
- void ModifyPersqueueConfig(const TActorContext& ctx,
+ void ModifyPersqueueConfig(TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo);
@@ -419,7 +419,7 @@ namespace NKikimr::NDataStreams::V1 {
}
void TUpdateStreamActor::ModifyPersqueueConfig(
- const TActorContext& ctx,
+ TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
@@ -430,7 +430,7 @@ namespace NKikimr::NDataStreams::V1 {
if (!GetProtoRequest()->has_partitioning_settings()) {
if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error))
{
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::BAD_REQUEST), error, ctx);
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::BAD_REQUEST), error);
}
groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count());
@@ -451,15 +451,16 @@ namespace NKikimr::NDataStreams::V1 {
default: {}
}
- auto* pqConfig = groupConfig.MutablePQTabletConfig();
+ auto* tabletConfig = groupConfig.MutablePQTabletConfig();
- pqConfig->MutablePartitionConfig()->SetWriteSpeedInBytesPerSecond(
+ tabletConfig->MutablePartitionConfig()->SetWriteSpeedInBytesPerSecond(
PartitionWriteSpeedInBytesPerSec(GetProtoRequest()->write_quota_kb_per_sec()));
+ const auto& pqConfig = appData->PQConfig;
if (GetProtoRequest()->has_stream_mode_details()) {
- if (!AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) {
+ if (!pqConfig.GetBillingMeteringConfig().GetEnabled()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
- "streams can't be created with metering mode", ctx);
+ "streams can't be created with metering mode");
}
switch(GetProtoRequest()->stream_mode_details().stream_mode()) {
@@ -471,14 +472,14 @@ namespace NKikimr::NDataStreams::V1 {
break;
default:
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
- "streams can't be created with unknown metering mode", ctx);
+ "streams can't be created with unknown metering mode");
}
}
if (GetProtoRequest()->has_partitioning_settings()) {
auto r = ValidatePartitioningSettings(GetProtoRequest()->partitioning_settings());
if (!r.empty()) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), r, ctx);
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), r);
}
auto& s = GetProtoRequest()->partitioning_settings();
@@ -519,12 +520,13 @@ namespace NKikimr::NDataStreams::V1 {
t->SetScaleDownPartitionWriteSpeedThresholdPercent(ws.down_utilization_percent() ? ws.down_utilization_percent() : 30);
}
- auto serviceTypes = GetSupportedClientServiceTypes(ctx);
- auto status = CheckConfig(*pqConfig, serviceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS);
+ auto serviceTypes = GetSupportedClientServiceTypes(pqConfig);
+ auto status = CheckConfig(*tabletConfig, serviceTypes, error, pqConfig,
+ Ydb::StatusIds::ALREADY_EXISTS);
if (status != Ydb::StatusIds::SUCCESS) {
return ReplyWithError(status, status == Ydb::StatusIds::ALREADY_EXISTS ? static_cast<size_t>(NYds::EErrorCodes::IN_USE) :
static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR),
- error, ctx);
+ error);
}
}
@@ -541,7 +543,7 @@ namespace NKikimr::NDataStreams::V1 {
}
void Bootstrap(const TActorContext& ctx);
- void ModifyPersqueueConfig(const TActorContext& ctx,
+ void ModifyPersqueueConfig(TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo);
@@ -554,7 +556,7 @@ namespace NKikimr::NDataStreams::V1 {
}
void TSetWriteQuotaActor::ModifyPersqueueConfig(
- const TActorContext& ctx,
+ TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
@@ -564,17 +566,18 @@ namespace NKikimr::NDataStreams::V1 {
TString error;
- auto* pqConfig = groupConfig.MutablePQTabletConfig();
+ auto* tabletConfig = groupConfig.MutablePQTabletConfig();
- pqConfig->MutablePartitionConfig()->SetWriteSpeedInBytesPerSecond(GetProtoRequest()->write_quota_kb_per_sec() * 1_KB);
- pqConfig->MutablePartitionConfig()->SetBurstSize(GetProtoRequest()->write_quota_kb_per_sec() * 1_KB);
+ tabletConfig->MutablePartitionConfig()->SetWriteSpeedInBytesPerSecond(GetProtoRequest()->write_quota_kb_per_sec() * 1_KB);
+ tabletConfig->MutablePartitionConfig()->SetBurstSize(GetProtoRequest()->write_quota_kb_per_sec() * 1_KB);
- auto serviceTypes = GetSupportedClientServiceTypes(ctx);
- auto status = CheckConfig(*pqConfig, serviceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS);
+ const auto& pqConfig = appData->PQConfig;
+ auto serviceTypes = GetSupportedClientServiceTypes(pqConfig);
+ auto status = CheckConfig(*tabletConfig, serviceTypes, error, pqConfig, Ydb::StatusIds::ALREADY_EXISTS);
if (status != Ydb::StatusIds::SUCCESS) {
return ReplyWithError(status, status == Ydb::StatusIds::ALREADY_EXISTS? static_cast<size_t>(NYds::EErrorCodes::IN_USE) :
static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR),
- error, ctx);
+ error);
}
}
@@ -600,7 +603,7 @@ namespace NKikimr::NDataStreams::V1 {
}
void ModifyPersqueueConfig(
- const TActorContext& ctx,
+ TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
@@ -611,9 +614,9 @@ namespace NKikimr::NDataStreams::V1 {
TString error;
Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS;
- auto* pqConfig = groupConfig.MutablePQTabletConfig();
+ auto* tabletConfig = groupConfig.MutablePQTabletConfig();
- ui32 currentLifetime = pqConfig->GetPartitionConfig().GetLifetimeSeconds();
+ ui32 currentLifetime = tabletConfig->GetPartitionConfig().GetLifetimeSeconds();
ui32 newLifetime = TInstant::Hours(this->GetProtoRequest()->retention_period_hours()).Seconds();
if (ShouldIncrease) {
if (newLifetime <= currentLifetime) {
@@ -628,16 +631,18 @@ namespace NKikimr::NDataStreams::V1 {
status = Ydb::StatusIds::BAD_REQUEST;
}
}
+ const auto& pqConfig = appData->PQConfig;
if (status == Ydb::StatusIds::SUCCESS) {
- pqConfig->MutablePartitionConfig()->SetLifetimeSeconds(newLifetime);
+ tabletConfig->MutablePartitionConfig()->SetLifetimeSeconds(newLifetime);
- auto serviceTypes = GetSupportedClientServiceTypes(ctx);
- status = CheckConfig(*pqConfig, serviceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS);
+ auto serviceTypes = GetSupportedClientServiceTypes(pqConfig);
+ status = CheckConfig(*tabletConfig, serviceTypes, error,
+ pqConfig, Ydb::StatusIds::ALREADY_EXISTS);
}
if (status != Ydb::StatusIds::SUCCESS) {
return TBase::ReplyWithError(status, status == Ydb::StatusIds::ALREADY_EXISTS ? static_cast<size_t>(NYds::EErrorCodes::IN_USE) :
- static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR), error, ctx);
+ static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR), error);
}
}
@@ -662,16 +667,16 @@ namespace NKikimr::NDataStreams::V1 {
void StateWork(TAutoPtr<IEventHandle>& ev);
void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
- void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext&) {
if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) {
ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED,
- TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
+ TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId);
}
}
- void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext&) {
ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED,
- TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
+ TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId);
}
void Handle(TEvPersQueue::TEvOffsetsResponse::TPtr& ev, const TActorContext& ctx) {
@@ -866,7 +871,7 @@ namespace NKikimr::NDataStreams::V1 {
}
if (!startShardFound) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
- TStringBuilder() << "Bad shard id " << GetProtoRequest()->exclusive_start_shard_id(), ctx);
+ TStringBuilder() << "Bad shard id " << GetProtoRequest()->exclusive_start_shard_id());
}
return ReplyWithResult(Ydb::StatusIds::SUCCESS, result, ctx);
}
@@ -1084,15 +1089,15 @@ namespace NKikimr::NDataStreams::V1 {
if (!GetProtoRequest()->next_token().empty() && !GetProtoRequest()->stream_arn().empty()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_PARAMETER_COMBINATION),
- TStringBuilder() << "StreamArn and NextToken can not be provided together", ctx);
+ TStringBuilder() << "StreamArn and NextToken can not be provided together");
}
if (NextToken.IsExpired()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::EXPIRED_TOKEN),
- TStringBuilder() << "Provided NextToken is expired", ctx);
+ TStringBuilder() << "Provided NextToken is expired");
}
if (!NextToken.IsValid()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
- TStringBuilder() << "Provided NextToken is malformed", ctx);
+ TStringBuilder() << "Provided NextToken is malformed");
}
auto maxResultsInRange = MIN_MAX_RESULTS <= MaxResults && MaxResults <= MAX_MAX_RESULTS;
@@ -1100,7 +1105,7 @@ namespace NKikimr::NDataStreams::V1 {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR),
TStringBuilder() << "Requested max_result value '" << MaxResults <<
"' is out of range [" << MIN_MAX_RESULTS << ", " << MAX_MAX_RESULTS <<
- "]", ctx);
+ "]");
}
SendDescribeProposeRequest(ctx);
Become(&TListStreamConsumersActor::StateWork);
@@ -1132,7 +1137,7 @@ namespace NKikimr::NDataStreams::V1 {
if (alreadyRead > consumerCount) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
TStringBuilder() << "Provided next_token is malformed - " <<
- "everything is already read", ActorContext());
+ "everything is already read");
}
const auto rulesToRead = std::min(consumerCount - alreadyRead, MaxResults);
@@ -1180,10 +1185,10 @@ namespace NKikimr::NDataStreams::V1 {
~TRegisterStreamConsumerActor() = default;
void Bootstrap(const NActors::TActorContext& ctx);
- void ModifyPersqueueConfig(const TActorContext& ctx,
+ void ModifyPersqueueConfig(TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
- const NKikimrSchemeOp::TDirEntry& selfInfo);
+ const NKikimrSchemeOp::TDirEntry& selfInfo) override;
void OnNotifyTxCompletionResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) override;
private:
@@ -1202,14 +1207,14 @@ namespace NKikimr::NDataStreams::V1 {
}
void TRegisterStreamConsumerActor::ModifyPersqueueConfig(
- const TActorContext& ctx,
+ TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
) {
Y_UNUSED(pqGroupDescription);
- auto* pqConfig = groupConfig.MutablePQTabletConfig();
+ auto* tabletConfig = groupConfig.MutablePQTabletConfig();
Ydb::PersQueue::V1::TopicSettings::ReadRule readRule;
readRule.set_consumer_name(ConsumerName);
readRule.set_supported_format(Ydb::PersQueue::V1::TopicSettings_Format_FORMAT_BASE);
@@ -1220,14 +1225,15 @@ namespace NKikimr::NDataStreams::V1 {
if (readRule.version() == 0) {
readRule.set_version(selfInfo.GetVersion().GetPQVersion());
}
- auto serviceTypes = GetSupportedClientServiceTypes(ctx);
- auto messageAndCode = AddReadRuleToConfig(pqConfig, readRule, serviceTypes, ctx);
- size_t issueCode = static_cast<size_t>(messageAndCode.PQCode);
+ const auto& pqConfig = appData->PQConfig;
+ auto serviceTypes = GetSupportedClientServiceTypes(pqConfig);
+ auto messageAndCode = AddReadRuleToConfig(tabletConfig, readRule, serviceTypes, pqConfig);
+ size_t issueCode = static_cast<size_t>(messageAndCode.PQCode);
Ydb::StatusIds::StatusCode status;
if (messageAndCode.PQCode == Ydb::PersQueue::ErrorCode::OK) {
- status = CheckConfig(*pqConfig, serviceTypes, messageAndCode.Message, ctx, Ydb::StatusIds::ALREADY_EXISTS);
+ status = CheckConfig(*tabletConfig, serviceTypes, messageAndCode.Message, pqConfig, Ydb::StatusIds::ALREADY_EXISTS);
if (status == Ydb::StatusIds::ALREADY_EXISTS) {
issueCode = static_cast<size_t>(NYds::EErrorCodes::IN_USE);
}
@@ -1236,7 +1242,7 @@ namespace NKikimr::NDataStreams::V1 {
}
if (status != Ydb::StatusIds::SUCCESS) {
- return ReplyWithError(status, issueCode, messageAndCode.Message, ctx);
+ return ReplyWithError(status, issueCode, messageAndCode.Message);
}
}
@@ -1265,10 +1271,10 @@ namespace NKikimr::NDataStreams::V1 {
~TDeregisterStreamConsumerActor() = default;
void Bootstrap(const NActors::TActorContext& ctx);
- void ModifyPersqueueConfig(const TActorContext& ctx,
+ void ModifyPersqueueConfig(TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
- const NKikimrSchemeOp::TDirEntry& selfInfo);
+ const NKikimrSchemeOp::TDirEntry& selfInfo) override;
private:
TString ConsumerName;
@@ -1286,7 +1292,7 @@ namespace NKikimr::NDataStreams::V1 {
}
void TDeregisterStreamConsumerActor::ModifyPersqueueConfig(
- const TActorContext& ctx,
+ TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
@@ -1296,10 +1302,10 @@ namespace NKikimr::NDataStreams::V1 {
groupConfig.MutablePQTabletConfig(),
pqGroupDescription.GetPQTabletConfig(),
GetProtoRequest()->consumer_name(),
- ctx
+ appData->PQConfig
);
if (!error.Empty()) {
- return ReplyWithError(Ydb::StatusIds::NOT_FOUND, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND), error, ctx);
+ return ReplyWithError(Ydb::StatusIds::NOT_FOUND, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND), error);
}
}
@@ -1352,7 +1358,7 @@ namespace NKikimr::NDataStreams::V1 {
auto sn = SequenceNumberToInt(GetProtoRequest()->starting_sequence_number());
if (!sn) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
- TStringBuilder() << "Malformed sequence number", ctx);
+ TStringBuilder() << "Malformed sequence number");
}
SequenceNumber = sn.value() + (IteratorType == TIteratorType::AFTER_SEQUENCE_NUMBER ? 1u : 0u);
}
@@ -1361,13 +1367,13 @@ namespace NKikimr::NDataStreams::V1 {
if (GetProtoRequest()->timestamp() == 0) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
TStringBuilder() << "Shard iterator type is AT_TIMESTAMP, " <<
- "but a timestamp is missed", ctx);
+ "but a timestamp is missed");
}
if (GetProtoRequest()->timestamp() > static_cast<i64>(TInstant::Now().MilliSeconds()) + TIMESTAMP_DELTA_ALLOWED_MS) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
TStringBuilder() << "Shard iterator type is AT_TIMESTAMP, " <<
- "but a timestamp is in the future", ctx);
+ "but a timestamp is in the future");
}
ReadTimestampMs = GetProtoRequest()->timestamp();
break;
@@ -1380,7 +1386,7 @@ namespace NKikimr::NDataStreams::V1 {
default:
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
TStringBuilder() << "Shard iterator type '" <<
- (ui32)IteratorType << "' is not known", ctx);
+ (ui32)IteratorType << "' is not known");
}
@@ -1412,7 +1418,7 @@ namespace NKikimr::NDataStreams::V1 {
TStringBuilder() << "Access to stream "
<< this->GetProtoRequest()->stream_name()
<< " is denied for subject "
- << token.GetUserSID(), ActorContext());
+ << token.GetUserSID());
}
}
@@ -1433,7 +1439,7 @@ namespace NKikimr::NDataStreams::V1 {
}
ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND),
- TStringBuilder() << "No such shard: " << ShardId, ActorContext());
+ TStringBuilder() << "No such shard: " << ShardId);
}
void TGetShardIteratorActor::SendResponse(const TActorContext& ctx, const TShardIterator& shardIt) {
@@ -1507,17 +1513,17 @@ namespace NKikimr::NDataStreams::V1 {
if (ShardIterator.IsExpired()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::EXPIRED_ITERATOR),
- TStringBuilder() << "Provided shard iterator is expired", ctx);
+ TStringBuilder() << "Provided shard iterator is expired");
}
if (!ShardIterator.IsValid()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
- TStringBuilder() << "Provided shard iterator is malformed", ctx);
+ TStringBuilder() << "Provided shard iterator is malformed");
}
Limit = Limit == 0 ? MAX_LIMIT : Limit;
if (Limit < 1 || Limit > MAX_LIMIT) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR),
- TStringBuilder() << "Limit '" << Limit << "' is out of bounds [1; " << MAX_LIMIT << "]", ctx);
+ TStringBuilder() << "Limit '" << Limit << "' is out of bounds [1; " << MAX_LIMIT << "]");
}
SendDescribeProposeRequest(ctx, ShardIterator.IsCdcTopic());
@@ -1579,7 +1585,7 @@ namespace NKikimr::NDataStreams::V1 {
TStringBuilder() << "Access to stream "
<< ShardIterator.GetStreamName()
<< " is denied for subject "
- << token.GetUserSID(), ActorContext());
+ << token.GetUserSID());
}
}
@@ -1597,7 +1603,7 @@ namespace NKikimr::NDataStreams::V1 {
}
ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND),
- TStringBuilder() << "No such shard: " << ShardIterator.GetShardId(), ActorContext());
+ TStringBuilder() << "No such shard: " << ShardIterator.GetShardId());
}
void TGetRecordsActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
@@ -1619,7 +1625,7 @@ namespace NKikimr::NDataStreams::V1 {
default:
return ReplyWithError(ConvertPersQueueInternalCodeToStatus(record.GetErrorCode()),
ConvertOldCode(record.GetErrorCode()),
- record.GetErrorReason(), ctx);
+ record.GetErrorReason());
}
break;
default: {}
@@ -1661,16 +1667,16 @@ namespace NKikimr::NDataStreams::V1 {
}
}
- void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
+ void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext&) {
if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) {
ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED,
- TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
+ TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId);
}
}
- void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
+ void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext&) {
ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED,
- TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
+ TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId);
}
void TGetRecordsActor::Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) {
@@ -1769,21 +1775,21 @@ namespace NKikimr::NDataStreams::V1 {
if (!GetProtoRequest()->next_token().empty() && !GetProtoRequest()->stream_name().empty()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_PARAMETER_COMBINATION),
- TStringBuilder() << "StreamName and NextToken can not be provided together", ctx);
+ TStringBuilder() << "StreamName and NextToken can not be provided together");
}
if (NextToken.IsExpired()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::EXPIRED_TOKEN),
- TStringBuilder() << "Provided next token is expired", ctx);
+ TStringBuilder() << "Provided next token is expired");
}
if (!NextToken.IsValid()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
- TStringBuilder() << "Provided next token is malformed", ctx);
+ TStringBuilder() << "Provided next token is malformed");
}
if (!TShardFilter::ShardFilterType_IsValid(ShardFilter.type())) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
TStringBuilder() << "Shard filter '" <<
- (ui32)ShardFilter.type() << "' is not known", ctx);
+ (ui32)ShardFilter.type() << "' is not known");
}
MaxResults = MaxResults == 0 ? DEFAULT_MAX_RESULTS : MaxResults;
@@ -1791,13 +1797,13 @@ namespace NKikimr::NDataStreams::V1 {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR),
TStringBuilder() << "Max results '" << MaxResults <<
"' is out of bound [" << MIN_MAX_RESULTS << "; " <<
- MAX_MAX_RESULTS << "]", ctx);
+ MAX_MAX_RESULTS << "]");
}
if (ShardFilter.type() == TShardFilter::AFTER_SHARD_ID && ShardFilter.shard_id() == "") {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::MISSING_PARAMETER),
TStringBuilder() << "Shard filter type is AFTER_SHARD_ID," <<
- " but no ShardId provided", ctx);
+ " but no ShardId provided");
}
SendDescribeProposeRequest(ctx);
@@ -1831,7 +1837,7 @@ namespace NKikimr::NDataStreams::V1 {
TStringBuilder() << "Access to stream "
<< this->GetProtoRequest()->stream_name()
<< " is denied for subject "
- << token.GetUserSID(), ActorContext());
+ << token.GetUserSID());
}
}
@@ -1888,7 +1894,7 @@ namespace NKikimr::NDataStreams::V1 {
if (alreadyRead > (ui32)partitions.size()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
TStringBuilder() << "Provided next_token is malformed - "
- "everything is already read", ctx);
+ "everything is already read");
}
const auto shardsToGet = std::min(partitions.size() - alreadyRead, MaxResults);
@@ -1944,16 +1950,16 @@ namespace NKikimr::NDataStreams::V1 {
}
}
- void TListShardsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
+ void TListShardsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext&) {
if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) {
ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED,
- TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
+ TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId);
}
}
- void TListShardsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
+ void TListShardsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext&) {
ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED,
- TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
+ TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId);
}
void TListShardsActor::SendResponse(const TActorContext& ctx) {
diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h
index f66c46af46b..645a1ab7eeb 100644
--- a/ydb/services/datastreams/put_records_actor.h
+++ b/ydb/services/datastreams/put_records_actor.h
@@ -282,7 +282,7 @@ namespace NKikimr::NDataStreams::V1 {
if (!error.empty()) {
return this->ReplyWithError(Ydb::StatusIds::BAD_REQUEST,
Ydb::PersQueue::ErrorCode::BAD_REQUEST,
- error, ctx);
+ error);
}
if (this->Request_->GetSerializedToken().empty()) {
@@ -291,7 +291,7 @@ namespace NKikimr::NDataStreams::V1 {
Ydb::PersQueue::ErrorCode::ACCESS_DENIED,
TStringBuilder() << "Access to stream "
<< this->GetProtoRequest()->stream_name()
- << " is denied", ctx);
+ << " is denied");
}
}
NACLib::TUserToken token(this->Request_->GetSerializedToken());
@@ -332,7 +332,7 @@ namespace NKikimr::NDataStreams::V1 {
TStringBuilder() << "Access for stream "
<< this->GetProtoRequest()->stream_name()
<< " is denied for subject "
- << token.GetUserSID(), this->ActorContext());
+ << token.GetUserSID());
}
}
@@ -346,7 +346,7 @@ namespace NKikimr::NDataStreams::V1 {
Ydb::PersQueue::ErrorCode::BAD_REQUEST,
TStringBuilder() << "write to mirrored stream "
<< this->GetProtoRequest()->stream_name()
- << " is forbidden", this->ActorContext());
+ << " is forbidden");
}
@@ -526,10 +526,10 @@ namespace NKikimr::NDataStreams::V1 {
if (putRecordsResult.records(0).error_code() == "ProvisionedThroughputExceededException"
|| putRecordsResult.records(0).error_code() == "ThrottlingException")
{
- return ReplyWithError(Ydb::StatusIds::OVERLOADED, Ydb::PersQueue::ErrorCode::OVERLOAD, putRecordsResult.records(0).error_message(), ctx);
+ return ReplyWithError(Ydb::StatusIds::OVERLOADED, Ydb::PersQueue::ErrorCode::OVERLOAD, putRecordsResult.records(0).error_message());
}
//TODO: other codes - access denied and so on
- return ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR, putRecordsResult.records(0).error_message(), ctx);
+ return ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR, putRecordsResult.records(0).error_message());
}
}
diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp
index 2f4a76da2ac..73b85dd72af 100644
--- a/ydb/services/lib/actors/pq_schema_actor.cpp
+++ b/ydb/services/lib/actors/pq_schema_actor.cpp
@@ -29,9 +29,8 @@ namespace NKikimr::NGRpcProxy::V1 {
return value == compareTo ? defaultValue : value;
}
- TClientServiceTypes GetSupportedClientServiceTypes(const TActorContext& ctx) {
+ TClientServiceTypes GetSupportedClientServiceTypes(const NKikimrPQ::TPQConfig& pqConfig) {
TClientServiceTypes serviceTypes;
- const auto& pqConfig = AppData(ctx)->PQConfig;
ui32 count = pqConfig.GetDefaultClientServiceType().GetMaxReadRulesCountPerTopic();
if (count == 0) count = Max<ui32>();
TString name = pqConfig.GetDefaultClientServiceType().GetName();
@@ -56,14 +55,13 @@ namespace NKikimr::NGRpcProxy::V1 {
return serviceTypes;
}
- TString ReadRuleServiceTypeMigration(NKikimrPQ::TPQTabletConfig *config, const TActorContext& ctx) {
+ TString ReadRuleServiceTypeMigration(NKikimrPQ::TPQTabletConfig *config, const NKikimrPQ::TPQConfig& pqConfig) {
auto rrServiceTypes = config->MutableReadRuleServiceTypes();
if (config->ReadRuleServiceTypesSize() > config->ReadRulesSize()) {
rrServiceTypes->Clear();
}
if (config->ReadRuleServiceTypesSize() < config->ReadRulesSize()) {
rrServiceTypes->Reserve(config->ReadRulesSize());
- const auto& pqConfig = AppData(ctx)->PQConfig;
if (pqConfig.GetDisallowDefaultClientServiceType()) {
return "service type must be set for all read rules";
}
@@ -78,10 +76,10 @@ namespace NKikimr::NGRpcProxy::V1 {
NKikimrPQ::TPQTabletConfig* config,
const Ydb::PersQueue::V1::TopicSettings::ReadRule& rr,
const TClientServiceTypes& supportedClientServiceTypes,
- const TActorContext& ctx
+ const NKikimrPQ::TPQConfig& pqConfig
) {
- auto consumerName = NPersQueue::ConvertNewConsumerName(rr.consumer_name(), ctx);
+ auto consumerName = NPersQueue::ConvertNewConsumerName(rr.consumer_name(), pqConfig);
if (consumerName.empty()) {
return TMsgPqCodes(TStringBuilder() << "consumer with empty name is forbidden", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
}
@@ -92,7 +90,7 @@ namespace NKikimr::NGRpcProxy::V1 {
);
}
{
- TString migrationError = ReadRuleServiceTypeMigration(config, ctx);
+ TString migrationError = ReadRuleServiceTypeMigration(config, pqConfig);
if (migrationError) {
return TMsgPqCodes(migrationError, Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
@@ -184,7 +182,6 @@ namespace NKikimr::NGRpcProxy::V1 {
config->AddReadRuleServiceTypes(rr.service_type());
}
} else {
- const auto& pqConfig = AppData(ctx)->PQConfig;
if (pqConfig.GetDisallowDefaultClientServiceType()) {
return TMsgPqCodes(
TStringBuilder() << "service type cannot be empty for consumer '" << rr.consumer_name() << "'",
@@ -221,9 +218,10 @@ namespace NKikimr::NGRpcProxy::V1 {
const Ydb::Topic::Consumer& rr,
const TClientServiceTypes& supportedClientServiceTypes,
const bool checkServiceType,
- const TActorContext& ctx
+ const NKikimrPQ::TPQConfig& pqConfig,
+ bool enableTopicDiskSubDomainQuota
) {
- auto consumerName = NPersQueue::ConvertNewConsumerName(rr.name(), ctx);
+ auto consumerName = NPersQueue::ConvertNewConsumerName(rr.name(), pqConfig);
if (consumerName.find("/") != TString::npos || consumerName.find("|") != TString::npos) {
return TMsgPqCodes(TStringBuilder() << "consumer '" << rr.name() << "' has illegal symbols", Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
@@ -231,7 +229,7 @@ namespace NKikimr::NGRpcProxy::V1 {
return TMsgPqCodes(TStringBuilder() << "consumer with empty name is forbidden", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
}
{
- TString migrationError = ReadRuleServiceTypeMigration(config, ctx);
+ TString migrationError = ReadRuleServiceTypeMigration(config, pqConfig);
if (migrationError) {
return TMsgPqCodes(migrationError, migrationError.empty() ? Ydb::PersQueue::ErrorCode::OK : Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); //find better issueCode
}
@@ -261,7 +259,6 @@ namespace NKikimr::NGRpcProxy::V1 {
}
TString serviceType;
- const auto& pqConfig = AppData(ctx)->PQConfig;
const auto& defaultClientServiceType = pqConfig.GetDefaultClientServiceType().GetName();
serviceType = defaultClientServiceType;
@@ -349,7 +346,7 @@ namespace NKikimr::NGRpcProxy::V1 {
}
if (rr.important()) {
- if (pqConfig.GetTopicsAreFirstClassCitizen() && !AppData(ctx)->FeatureFlags.GetEnableTopicDiskSubDomainQuota()) {
+ if (pqConfig.GetTopicsAreFirstClassCitizen() && !enableTopicDiskSubDomainQuota) {
return TMsgPqCodes(TStringBuilder() << "important flag is forbiden for consumer " << rr.name(), Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
consumer->SetImportant(true);
@@ -366,7 +363,7 @@ namespace NKikimr::NGRpcProxy::V1 {
NKikimrPQ::TPQTabletConfig* config,
const NKikimrPQ::TPQTabletConfig& originalConfig,
const TString& consumerName,
- const TActorContext& ctx
+ const NKikimrPQ::TPQConfig& pqConfig
) {
config->ClearReadRuleVersions();
config->ClearReadRules();
@@ -387,7 +384,6 @@ namespace NKikimr::NGRpcProxy::V1 {
bool removed = false;
- const auto& pqConfig = AppData(ctx)->PQConfig;
if (NPQ::ReadRuleCompatible()) {
for (size_t i = 0; i < originalConfig.ReadRulesSize(); i++) {
auto& readRule = originalConfig.GetReadRules(i);
@@ -437,7 +433,7 @@ namespace NKikimr::NGRpcProxy::V1 {
bool CheckReadRulesConfig(const NKikimrPQ::TPQTabletConfig& config,
const TClientServiceTypes& supportedClientServiceTypes,
- TString& error, const TActorContext& ctx) {
+ TString& error, const NKikimrPQ::TPQConfig& pqConfig) {
size_t consumerCount = NPQ::ConsumerCount(config);
if (consumerCount > MAX_READ_RULES_COUNT) {
@@ -470,7 +466,7 @@ namespace NKikimr::NGRpcProxy::V1 {
}
if (config.GetCodecs().IdsSize() > 0) {
for (const auto& consumer : config.GetConsumers()) {
- TString name = NPersQueue::ConvertOldConsumerName(consumer.GetName(), ctx);
+ TString name = NPersQueue::ConvertOldConsumerName(consumer.GetName(), pqConfig);
if (consumer.GetCodec().IdsSize() > 0) {
THashSet<i64> codecs;
@@ -492,13 +488,13 @@ namespace NKikimr::NGRpcProxy::V1 {
Ydb::StatusIds::StatusCode CheckConfig(const NKikimrPQ::TPQTabletConfig& config,
const TClientServiceTypes& supportedClientServiceTypes,
- TString& error, const TActorContext& ctx, const Ydb::StatusIds::StatusCode dubsStatus)
+ TString& error, const NKikimrPQ::TPQConfig& pqConfig, const Ydb::StatusIds::StatusCode dubsStatus)
{
ui32 speed = config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond();
ui32 burst = config.GetPartitionConfig().GetBurstSize();
std::set<ui32> validLimits {};
- if (AppData(ctx)->PQConfig.ValidWriteSpeedLimitsKbPerSecSize() == 0) {
+ if (pqConfig.ValidWriteSpeedLimitsKbPerSecSize() == 0) {
validLimits.insert(speed);
} else {
const auto& limits = AppData()->PQConfig.GetValidWriteSpeedLimitsKbPerSec();
@@ -546,7 +542,7 @@ namespace NKikimr::NGRpcProxy::V1 {
return Ydb::StatusIds::BAD_REQUEST;
}
- bool hasDuplicates = CheckReadRulesConfig(config, supportedClientServiceTypes, error, ctx);
+ bool hasDuplicates = CheckReadRulesConfig(config, supportedClientServiceTypes, error, pqConfig);
return error.empty() ? Ydb::StatusIds::SUCCESS : (hasDuplicates ? dubsStatus : Ydb::StatusIds::BAD_REQUEST);
}
@@ -935,14 +931,14 @@ namespace NKikimr::NGRpcProxy::V1 {
}
{
- error = ReadRuleServiceTypeMigration(pqTabletConfig, ctx);
+ error = ReadRuleServiceTypeMigration(pqTabletConfig, pqConfig);
if (error) {
return Ydb::StatusIds::INTERNAL_ERROR;
}
}
- const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(ctx);
+ const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(pqConfig);
for (const auto& rr : settings.read_rules()) {
- auto messageAndCode = AddReadRuleToConfig(pqTabletConfig, rr, supportedClientServiceTypes, ctx);
+ auto messageAndCode = AddReadRuleToConfig(pqTabletConfig, rr, supportedClientServiceTypes, pqConfig);
if (messageAndCode.PQCode != Ydb::PersQueue::ErrorCode::OK) {
error = messageAndCode.Message;
return Ydb::StatusIds::BAD_REQUEST;
@@ -1034,7 +1030,7 @@ namespace NKikimr::NGRpcProxy::V1 {
}
}
- return CheckConfig(*pqTabletConfig, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::BAD_REQUEST);
+ return CheckConfig(*pqTabletConfig, supportedClientServiceTypes, error, pqConfig, Ydb::StatusIds::BAD_REQUEST);
}
static bool FillMeteringMode(Ydb::Topic::MeteringMode mode, NKikimrPQ::TPQTabletConfig& config,
@@ -1074,10 +1070,10 @@ namespace NKikimr::NGRpcProxy::V1 {
TYdbPqCodes FillProposeRequestImpl(
const TString& name, const Ydb::Topic::CreateTopicRequest& request,
- NKikimrSchemeOp::TModifyScheme& modifyScheme, const TActorContext& ctx,
+ NKikimrSchemeOp::TModifyScheme& modifyScheme, TAppData* appData,
TString& error, const TString& path, const TString& database, const TString& localDc
) {
- const auto& pqConfig = AppData(ctx)->PQConfig;
+ const auto& pqConfig = appData->PQConfig;
modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup);
auto pqDescr = modifyScheme.MutableCreatePersQueueGroup();
@@ -1098,7 +1094,7 @@ namespace NKikimr::NGRpcProxy::V1 {
return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
}
minParts = std::max<ui32>(1, settings.min_active_partitions());
- if (AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge() && request.has_partitioning_settings()) {
+ if (appData->FeatureFlags.GetEnableTopicSplitMerge() && request.has_partitioning_settings()) {
auto pqTabletConfigPartStrategy = pqTabletConfig->MutablePartitionStrategy();
auto autoscaleSettings = settings.auto_partitioning_settings();
pqTabletConfigPartStrategy->SetMinPartitionCount(minParts);
@@ -1214,7 +1210,7 @@ namespace NKikimr::NGRpcProxy::V1 {
}
{
- error = ReadRuleServiceTypeMigration(pqTabletConfig, ctx);
+ error = ReadRuleServiceTypeMigration(pqTabletConfig, pqConfig);
if (error) {
return TYdbPqCodes(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
@@ -1225,23 +1221,25 @@ namespace NKikimr::NGRpcProxy::V1 {
return TYdbPqCodes(code, Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
- const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(ctx);
+ const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(pqConfig);
for (const auto& rr : request.consumers()) {
- auto messageAndCode = AddReadRuleToConfig(pqTabletConfig, rr, supportedClientServiceTypes, true, ctx);
+ auto messageAndCode = AddReadRuleToConfig(pqTabletConfig, rr, supportedClientServiceTypes, true, pqConfig,
+ appData->FeatureFlags.GetEnableTopicDiskSubDomainQuota());
if (messageAndCode.PQCode != Ydb::PersQueue::ErrorCode::OK) {
error = messageAndCode.Message;
return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, messageAndCode.PQCode);
}
}
- return TYdbPqCodes(CheckConfig(*pqTabletConfig, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::BAD_REQUEST), Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
+ return TYdbPqCodes(CheckConfig(*pqTabletConfig, supportedClientServiceTypes, error, pqConfig, Ydb::StatusIds::BAD_REQUEST),
+ Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
}
Ydb::StatusIds::StatusCode FillProposeRequestImpl(
const Ydb::Topic::AlterTopicRequest& request,
- NKikimrSchemeOp::TPersQueueGroupDescription& pqDescr, const TActorContext& ctx,
+ NKikimrSchemeOp::TPersQueueGroupDescription& pqDescr, TAppData* appData,
TString& error, bool isCdcStream
) {
#define CHECK_CDC if (isCdcStream) {\
@@ -1249,11 +1247,11 @@ namespace NKikimr::NGRpcProxy::V1 {
return Ydb::StatusIds::BAD_REQUEST;\
}
- const auto& pqConfig = AppData(ctx)->PQConfig;
+ const auto& pqConfig = appData->PQConfig;
auto pqTabletConfig = pqDescr.MutablePQTabletConfig();
NPQ::Migrate(*pqTabletConfig);
auto partConfig = pqTabletConfig->MutablePartitionConfig();
- auto splitMergeFeatureEnabled = AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge();
+ auto splitMergeFeatureEnabled = appData->FeatureFlags.GetEnableTopicSplitMerge();
if (request.has_set_retention_storage_mb()) {
CHECK_CDC;
@@ -1363,7 +1361,7 @@ namespace NKikimr::NGRpcProxy::V1 {
}
}
{
- error = ReadRuleServiceTypeMigration(pqTabletConfig, ctx);
+ error = ReadRuleServiceTypeMigration(pqTabletConfig, pqConfig);
if (error) {
return Ydb::StatusIds::INTERNAL_ERROR;
}
@@ -1374,7 +1372,7 @@ namespace NKikimr::NGRpcProxy::V1 {
return code;
}
- const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(ctx);
+ const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(pqConfig);
std::vector<std::pair<bool, Ydb::Topic::Consumer>> consumers;
@@ -1383,7 +1381,7 @@ namespace NKikimr::NGRpcProxy::V1 {
for (const auto& c : pqTabletConfig->GetConsumers()) {
auto& oldName = c.GetName();
- auto name = NPersQueue::ConvertOldConsumerName(oldName, ctx);
+ auto name = NPersQueue::ConvertOldConsumerName(oldName, pqConfig);
bool erase = false;
for (auto consumer: request.drop_consumers()) {
@@ -1418,7 +1416,7 @@ namespace NKikimr::NGRpcProxy::V1 {
for (const auto& alter : request.alter_consumers()) {
auto name = alter.name();
- auto oldName = NPersQueue::ConvertOldConsumerName(name, ctx);
+ auto oldName = NPersQueue::ConvertOldConsumerName(name, pqConfig);
bool found = false;
for (auto& consumer : consumers) {
if (consumer.second.name() == name || consumer.second.name() == oldName) {
@@ -1445,13 +1443,14 @@ namespace NKikimr::NGRpcProxy::V1 {
pqTabletConfig->ClearConsumers();
for (const auto& rr : consumers) {
- auto messageAndCode = AddReadRuleToConfig(pqTabletConfig, rr.second, supportedClientServiceTypes, rr.first, ctx);
+ auto messageAndCode = AddReadRuleToConfig(pqTabletConfig, rr.second, supportedClientServiceTypes, rr.first,
+ pqConfig, appData->FeatureFlags.GetEnableTopicDiskSubDomainQuota());
if (messageAndCode.PQCode != Ydb::PersQueue::ErrorCode::OK) {
error = messageAndCode.Message;
return Ydb::StatusIds::BAD_REQUEST;
}
}
- return CheckConfig(*pqTabletConfig, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS);
+ return CheckConfig(*pqTabletConfig, supportedClientServiceTypes, error, pqConfig, Ydb::StatusIds::ALREADY_EXISTS);
}
}
diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h
index b39ec377f58..fbeb92225f3 100644
--- a/ydb/services/lib/actors/pq_schema_actor.h
+++ b/ydb/services/lib/actors/pq_schema_actor.h
@@ -47,7 +47,7 @@ namespace NKikimr::NGRpcProxy::V1 {
const TString& name,
const Ydb::Topic::CreateTopicRequest& request,
NKikimrSchemeOp::TModifyScheme& modifyScheme,
- const TActorContext& ctx,
+ TAppData* appData,
TString& error,
const TString& path,
const TString& database = TString(),
@@ -57,7 +57,7 @@ namespace NKikimr::NGRpcProxy::V1 {
Ydb::StatusIds::StatusCode FillProposeRequestImpl(
const Ydb::Topic::AlterTopicRequest& request,
NKikimrSchemeOp::TPersQueueGroupDescription& pqDescr,
- const TActorContext& ctx,
+ TAppData* appData,
TString& error,
bool isCdcStream
);
@@ -69,24 +69,24 @@ namespace NKikimr::NGRpcProxy::V1 {
TVector<TString> PasswordHashes;
};
typedef std::map<TString, TClientServiceType> TClientServiceTypes;
- TClientServiceTypes GetSupportedClientServiceTypes(const TActorContext& ctx);
+ TClientServiceTypes GetSupportedClientServiceTypes(const NKikimrPQ::TPQConfig& pqConfig);
// Returns true if have duplicated read rules
Ydb::StatusIds::StatusCode CheckConfig(const NKikimrPQ::TPQTabletConfig& config, const TClientServiceTypes& supportedReadRuleServiceTypes,
- TString& error, const TActorContext& ctx,
+ TString& error, const NKikimrPQ::TPQConfig& pqConfig,
const Ydb::StatusIds::StatusCode dubsStatus = Ydb::StatusIds::BAD_REQUEST);
TMsgPqCodes AddReadRuleToConfig(
NKikimrPQ::TPQTabletConfig *config,
const Ydb::PersQueue::V1::TopicSettings::ReadRule& rr,
const TClientServiceTypes& supportedReadRuleServiceTypes,
- const TActorContext& ctx
+ const NKikimrPQ::TPQConfig& pqConfig
);
TString RemoveReadRuleFromConfig(
NKikimrPQ::TPQTabletConfig *config,
const NKikimrPQ::TPQTabletConfig& originalConfig,
const TString& consumerName,
- const TActorContext& ctx
+ const NKikimrPQ::TPQConfig& pqConfig
);
NYql::TIssue FillIssue(const TString &errorReason, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode);
NYql::TIssue FillIssue(const TString &errorReason, const size_t errorCode);
@@ -119,7 +119,7 @@ namespace NKikimr::NGRpcProxy::V1 {
protected:
virtual TString GetTopicPath() const = 0;
- virtual void RespondWithCode(Ydb::StatusIds::StatusCode status) = 0;
+ virtual void RespondWithCode(Ydb::StatusIds::StatusCode status, bool notFound = false) = 0;
virtual void AddIssue(const NYql::TIssue& issue) = 0;
virtual bool SetRequestToken(NSchemeCache::TSchemeCacheNavigate* request) const = 0;
@@ -158,7 +158,7 @@ namespace NKikimr::NGRpcProxy::V1 {
}
AddIssue(FillIssue(TStringBuilder() << "path '" << JoinPath(entry.Path) << "' is not a topic",
Ydb::PersQueue::ErrorCode::VALIDATION_ERROR));
- RespondWithCode(Ydb::StatusIds::SCHEME_ERROR);
+ RespondWithCode(Ydb::StatusIds::SCHEME_ERROR, true);
return true;
}
@@ -202,7 +202,7 @@ namespace NKikimr::NGRpcProxy::V1 {
Ydb::PersQueue::ErrorCode::ACCESS_DENIED
)
);
- return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR);
+ return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR, true);
}
case NSchemeCache::TSchemeCacheNavigate::EStatus::TableCreationNotComplete: {
AddIssue(
@@ -240,6 +240,7 @@ namespace NKikimr::NGRpcProxy::V1 {
}
}
+ public:
void StateWork(TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
@@ -296,7 +297,7 @@ namespace NKikimr::NGRpcProxy::V1 {
protected:
// TDerived must implement FillProposeRequest(TEvProposeTransaction&, const TActorContext& ctx, TString workingDir, TString name);
- void SendProposeRequest(const NActors::TActorContext &ctx) {
+ void SendProposeRequest(const NActors::TActorContext& ctx) {
std::pair <TString, TString> pathPair;
try {
pathPair = NKikimr::NGRpcService::SplitPath(GetTopicPath());
@@ -316,7 +317,7 @@ namespace NKikimr::NGRpcProxy::V1 {
if (this->Request_->GetSerializedToken().empty()) {
if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, Ydb::PersQueue::ErrorCode::ACCESS_DENIED,
- "Unauthenticated access is forbidden, please provide credentials", ctx);
+ "Unauthenticated access is forbidden, please provide credentials");
}
} else {
proposal->Record.SetUserToken(this->Request_->GetSerializedToken());
@@ -363,21 +364,22 @@ namespace NKikimr::NGRpcProxy::V1 {
}
void ReplyWithError(Ydb::StatusIds::StatusCode status, size_t additionalStatus,
- const TString& messageText, const NActors::TActorContext& ctx) {
+ const TString& messageText) {
if (TActorBase::IsDead)
return;
this->Request_->RaiseIssue(FillIssue(messageText, additionalStatus));
this->Request_->ReplyWithYdbStatus(status);
- this->Die(ctx);
+ this->Die(this->ActorContext());
TActorBase::IsDead = true;
}
- void RespondWithCode(Ydb::StatusIds::StatusCode status) override {
+ void RespondWithCode(Ydb::StatusIds::StatusCode status, bool notFound = false) override {
if (TActorBase::IsDead)
return;
this->Request_->ReplyWithYdbStatus(status);
this->Die(this->ActorContext());
TActorBase::IsDead = true;
+ Y_UNUSED(notFound);
}
template<class TProtoResult>
@@ -403,25 +405,22 @@ namespace NKikimr::NGRpcProxy::V1 {
//-----------------------------------------------------------------------------------
- template<class TDerived, class TRequest>
- class TUpdateSchemeActor : public TPQGrpcSchemaBase<TDerived, TRequest> {
- using TBase = TPQGrpcSchemaBase<TDerived, TRequest>;
-
+ template<class TDerived>
+ class TUpdateSchemeActorBase {
public:
- TUpdateSchemeActor(NGRpcService::IRequestOpCtx* request, const TString& topicPath)
- : TBase(request, topicPath)
- {}
- TUpdateSchemeActor(NGRpcService::IRequestOpCtx* request)
- : TBase(request)
- {}
- ~TUpdateSchemeActor() = default;
+ ~TUpdateSchemeActorBase() = default;
- void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TActorContext& ctx,
- const TString& workingDir, const TString& name)
+ virtual void ModifyPersqueueConfig(TAppData* appData,
+ NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
+ const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
+ const NKikimrSchemeOp::TDirEntry& selfInfo
+ ) = 0;
+
+ protected:
+ void FillModifyScheme(NKikimrSchemeOp::TModifyScheme& modifyScheme, const TActorContext& ctx,
+ const TString& workingDir, const TString& name)
{
Y_UNUSED(name);
- const auto& response = DescribeSchemeResult->Get()->Request.Get()->ResultSet.front();
- NKikimrSchemeOp::TModifyScheme& modifyScheme(*proposal.Record.MutableTransaction()->MutableModifyScheme());
modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup);
modifyScheme.SetWorkingDir(workingDir);
@@ -430,6 +429,8 @@ namespace NKikimr::NGRpcProxy::V1 {
}
auto* config = modifyScheme.MutableAlterPersQueueGroup();
+ const auto& response = DescribeSchemeResult->Get()->Request.Get()->ResultSet.front();
+
Y_ABORT_UNLESS(response.Self);
Y_ABORT_UNLESS(response.PQGroupInfo);
config->CopyFrom(response.PQGroupInfo->Description);
@@ -444,33 +445,64 @@ namespace NKikimr::NGRpcProxy::V1 {
applyIf->SetPathVersion(response.Self->Info.GetPathVersion());
}
- static_cast<TDerived*>(this)->ModifyPersqueueConfig(
- ctx,
+ ModifyPersqueueConfig(
+ AppData(ctx),
*config,
response.PQGroupInfo->Description,
response.Self->Info
);
-
- this->DescribeSchemeResult.Reset();
}
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ virtual void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get();
Y_ABORT_UNLESS(result->ResultSet.size() == 1);
DescribeSchemeResult = std::move(ev);
+ }
+ protected:
+ THolder<NActors::TEventHandle<TEvTxProxySchemeCache::TEvNavigateKeySetResult>> DescribeSchemeResult;
+ };
+
+
+ template<class TDerived, class TRequest>
+ class TUpdateSchemeActor : public TPQGrpcSchemaBase<TDerived, TRequest>,
+ public TUpdateSchemeActorBase<TDerived> {
+ using TBase = TPQGrpcSchemaBase<TDerived, TRequest>;
+ using TUpdateSchemeBase = TUpdateSchemeActorBase<TDerived>;
+
+ public:
+ TUpdateSchemeActor(NGRpcService::IRequestOpCtx* request, const TString& topicPath)
+ : TBase(request, topicPath)
+ {}
+ TUpdateSchemeActor(NGRpcService::IRequestOpCtx* request)
+ : TBase(request)
+ {}
+ ~TUpdateSchemeActor() = default;
+
+
+ void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TActorContext& ctx,
+ const TString& workingDir, const TString& name)
+ {
+ NKikimrSchemeOp::TModifyScheme& modifyScheme(*proposal.Record.MutableTransaction()->MutableModifyScheme());
+ this->FillModifyScheme(modifyScheme, ctx, workingDir, name);
+ this->DescribeSchemeResult.Reset();
+ }
+
+
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override {
+ TUpdateSchemeBase::HandleCacheNavigateResponse(ev);
return this->SendProposeRequest(this->ActorContext());
}
+
void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const TActorContext& ctx) {
auto msg = ev->Get();
const auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(ev->Get()->Record.GetStatus());
- if (status == TEvTxUserProxy::TResultStatus::ExecError && msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusPreconditionFailed)
+ if (status == TEvTxUserProxy::TResultStatus::ExecError && msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusPreconditionFailed)
{
return TBase::ReplyWithError(Ydb::StatusIds::OVERLOADED,
Ydb::PersQueue::ErrorCode::OVERLOAD,
- TStringBuilder() << "Topic with name " << TBase::GetTopicPath() << " has another alter in progress",
- ctx);
+ TStringBuilder() << "Topic with name " << TBase::GetTopicPath() << " has another alter in progress");
}
return TBase::TBase::Handle(ev, ctx);
@@ -482,12 +514,8 @@ namespace NKikimr::NGRpcProxy::V1 {
default: TBase::StateWork(ev);
}
}
-
- protected:
- THolder<NActors::TEventHandle<TEvTxProxySchemeCache::TEvNavigateKeySetResult>> DescribeSchemeResult;
};
-
template<class TDerived, class TRequest, class TEvResponse>
class TPQInternalSchemaActor : public TPQSchemaBase<TPQInternalSchemaActor<TDerived, TRequest, TEvResponse>>
, public TActorBootstrapped<TPQInternalSchemaActor<TDerived, TRequest, TEvResponse>>
@@ -542,19 +570,32 @@ namespace NKikimr::NGRpcProxy::V1 {
Response->Issues.AddIssue(issue);
}
-
- void RespondWithCode(Ydb::StatusIds::StatusCode status) override {
- Response->Status = status;
- this->ActorContext().Send(Requester, Response.Release());
+ void RespondWithCode(Ydb::StatusIds::StatusCode status, bool notFound = false) override {
+ if (!RespondOverride(status, notFound)) {
+ Response->Status = status;
+ this->ActorContext().Send(Requester, Response.Release());
+ }
this->Die(this->ActorContext());
TBase::IsDead = true;
}
+ protected:
+ const TRequest& GetRequest() const {
+ return Request;
+ }
+
+ virtual bool RespondOverride(Ydb::StatusIds::StatusCode status, bool notFound) {
+ Y_UNUSED(status);
+ Y_UNUSED(notFound);
+ return false;
+ }
+
private:
TRequest Request;
TActorId Requester;
TMaybe<TString> PrivateTopicName;
+
protected:
THolder<TEvResponse> Response;
TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo;
diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h
index d311d8bfa77..96f3471b5ce 100644
--- a/ydb/services/persqueue_v1/actors/events.h
+++ b/ydb/services/persqueue_v1/actors/events.h
@@ -25,6 +25,16 @@ using namespace Ydb;
// ui64 Cookie;
// };
+struct TLocalResponseBase {
+ Ydb::StatusIds::StatusCode Status;
+ NYql::TIssues Issues;
+};
+
+
+struct TAlterTopicResponse : public TLocalResponseBase {
+ NKikimrSchemeOp::TModifyScheme ModifyScheme;
+};
+
struct TEvPQProxy {
enum EEv {
EvWriteInit = EventSpaceBegin(TKikimrEvents::ES_PQ_PROXY_NEW), // TODO: Replace 'NEW' with version or something
@@ -77,6 +87,7 @@ struct TEvPQProxy {
EvDirectReadSendClientData,
EvReadingStarted,
EvReadingFinished,
+ EvAlterTopicResponse,
EvEnd
};
@@ -490,11 +501,6 @@ struct TEvPQProxy {
ui64 TabletId;
};
- struct TLocalResponseBase {
- Ydb::StatusIds::StatusCode Status;
- NYql::TIssues Issues;
- };
-
struct TPartitionLocationInfo {
ui64 PartitionId;
ui64 Generation;
@@ -635,6 +641,11 @@ struct TEvPQProxy {
std::vector<ui32> AdjacentPartitionIds;
std::vector<ui32> ChildPartitionIds;
};
+
+ struct TEvAlterTopicResponse : public TEventLocal<TEvAlterTopicResponse, EvAlterTopicResponse>
+ , public TLocalResponseBase {
+ TAlterTopicResponse Response;
+ };
};
struct TLocalRequestBase {
@@ -662,4 +673,23 @@ struct TGetPartitionsLocationRequest : public TLocalRequestBase {
TVector<ui32> PartitionIds;
};
+
+struct TAlterTopicRequest : public TLocalRequestBase {
+ TAlterTopicRequest(Ydb::Topic::AlterTopicRequest&& request, const TString& workDir, const TString& name,
+ const TString& database, const TString& token, bool missingOk)
+ : TLocalRequestBase(request.path(), database, token)
+ , Request(std::move(request))
+ , WorkingDir(workDir)
+ , Name(name)
+ , MissingOk(missingOk)
+ {}
+
+ Ydb::Topic::AlterTopicRequest Request;
+ TString WorkingDir;
+ TString Name;
+ bool MissingOk;
+};
+
+
+
}
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp
index b953a9f0198..c68c6fe205a 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.cpp
+++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp
@@ -222,31 +222,32 @@ void TAddReadRuleActor::Bootstrap(const NActors::TActorContext& ctx) {
}
void TAddReadRuleActor::ModifyPersqueueConfig(
- const TActorContext& ctx,
+ TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
) {
Y_UNUSED(pqGroupDescription);
- auto* pqConfig = groupConfig.MutablePQTabletConfig();
+ auto* tabletConfig = groupConfig.MutablePQTabletConfig();
+ const auto& pqConfig = appData->PQConfig;
auto rule = GetProtoRequest()->read_rule();
if (rule.version() == 0) {
rule.set_version(selfInfo.GetVersion().GetPQVersion());
}
- auto serviceTypes = GetSupportedClientServiceTypes(ctx);
+ auto serviceTypes = GetSupportedClientServiceTypes(pqConfig);
TString error;
- auto messageAndCode = AddReadRuleToConfig(pqConfig, rule, serviceTypes, ctx);
+ auto messageAndCode = AddReadRuleToConfig(tabletConfig, rule, serviceTypes, pqConfig);
auto status = messageAndCode.PQCode == Ydb::PersQueue::ErrorCode::OK ?
- CheckConfig(*pqConfig, serviceTypes, messageAndCode.Message, ctx, Ydb::StatusIds::ALREADY_EXISTS)
+ CheckConfig(*tabletConfig, serviceTypes, messageAndCode.Message, pqConfig, Ydb::StatusIds::ALREADY_EXISTS)
: Ydb::StatusIds::BAD_REQUEST;
if (status != Ydb::StatusIds::SUCCESS) {
return ReplyWithError(status,
status == Ydb::StatusIds::ALREADY_EXISTS ? Ydb::PersQueue::ErrorCode::OK
: Ydb::PersQueue::ErrorCode::BAD_REQUEST,
- messageAndCode.Message, ctx);
+ messageAndCode.Message);
}
}
@@ -263,7 +264,7 @@ void TRemoveReadRuleActor::Bootstrap(const NActors::TActorContext& ctx) {
}
void TRemoveReadRuleActor::ModifyPersqueueConfig(
- const TActorContext& ctx,
+ TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
@@ -274,10 +275,10 @@ void TRemoveReadRuleActor::ModifyPersqueueConfig(
groupConfig.MutablePQTabletConfig(),
pqGroupDescription.GetPQTabletConfig(),
GetProtoRequest()->consumer_name(),
- ctx
+ appData->PQConfig
);
if (!error.Empty()) {
- return ReplyWithError(Ydb::StatusIds::NOT_FOUND, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx);
+ return ReplyWithError(Ydb::StatusIds::NOT_FOUND, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error);
}
}
@@ -373,7 +374,7 @@ void TCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction
{
TString error;
- auto status = FillProposeRequestImpl(name, *GetProtoRequest(), modifyScheme, ctx, error,
+ auto status = FillProposeRequestImpl(name, *GetProtoRequest(), modifyScheme, AppData(ctx), error,
workingDir, proposal.Record.GetDatabaseName(), LocalCluster).YdbCode;
if (!error.empty()) {
@@ -426,7 +427,7 @@ void TAlterTopicActor::Bootstrap(const NActors::TActorContext& ctx) {
}
void TAlterTopicActor::ModifyPersqueueConfig(
- const TActorContext& ctx,
+ TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
@@ -436,7 +437,7 @@ void TAlterTopicActor::ModifyPersqueueConfig(
TString error;
Y_UNUSED(selfInfo);
- auto status = FillProposeRequestImpl(*GetProtoRequest(), groupConfig, ctx, error, GetCdcStreamName().Defined());
+ auto status = FillProposeRequestImpl(*GetProtoRequest(), groupConfig, appData, error, GetCdcStreamName().Defined());
if (!error.empty()) {
Request_->RaiseIssue(FillIssue(error, Ydb::PersQueue::ErrorCode::BAD_REQUEST));
return RespondWithCode(status);
@@ -1564,4 +1565,60 @@ void TPartitionsLocationActor::RaiseError(const TString& error, const Ydb::PersQ
this->RespondWithCode(status);
}
+TAlterTopicActorInternal::TAlterTopicActorInternal(
+ TAlterTopicActorInternal::TRequest&& request,
+ NThreading::TPromise<TAlterTopicResponse>&& promise,
+ bool missingOk
+)
+ : TActorBase(std::move(request), TActorId{})
+ , Promise(std::move(promise))
+ , MissingOk(missingOk)
+{}
+
+void TAlterTopicActorInternal::Bootstrap(const NActors::TActorContext&) {
+ SendDescribeProposeRequest();
+ Become(&TAlterTopicActorInternal::StateWork);
+}
+
+void TAlterTopicActorInternal::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ if (!TActorBase::HandleCacheNavigateResponseBase(ev)) {
+ this->Die(ActorContext());
+ return;
+ }
+ TUpdateSchemeBase::HandleCacheNavigateResponse(ev);
+ auto& schemeTx = Response->Response.ModifyScheme;
+ FillModifyScheme(schemeTx, ActorContext(), GetRequest().WorkingDir, GetRequest().Name);
+}
+
+void TAlterTopicActorInternal::ModifyPersqueueConfig(
+ TAppData* appData,
+ NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
+ const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
+ const NKikimrSchemeOp::TDirEntry& selfInfo
+) {
+ Y_UNUSED(pqGroupDescription);
+ Y_UNUSED(selfInfo);
+ TString error;
+ Y_UNUSED(selfInfo);
+
+ auto status = FillProposeRequestImpl(GetRequest().Request, groupConfig, appData, error, false);
+ if (!error.empty()) {
+ Response->Response.Issues.AddIssue(error);
+ }
+ RespondWithCode(status);
+}
+
+bool TAlterTopicActorInternal::RespondOverride(Ydb::StatusIds::StatusCode status, bool notFound) {
+ if (MissingOk && notFound) {
+ Response->Response.Status = Ydb::StatusIds::SUCCESS;
+ Response->Response.ModifyScheme.Clear();
+
+ } else {
+ Response->Response.Status = status;
+ Response->Response.Issues.AddIssues(std::move(Response->Issues));
+ }
+ Promise.SetValue(std::move(Response->Response));
+ return true;
+}
+
} // namespace NKikimr::NGRpcProxy::V1
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.h b/ydb/services/persqueue_v1/actors/schema_actors.h
index 6b37cb91138..b6c00f7cdb1 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.h
+++ b/ydb/services/persqueue_v1/actors/schema_actors.h
@@ -282,10 +282,10 @@ public:
TAddReadRuleActor(NKikimr::NGRpcService::TEvPQAddReadRuleRequest *request);
void Bootstrap(const NActors::TActorContext& ctx);
- void ModifyPersqueueConfig(const TActorContext& ctx,
+ void ModifyPersqueueConfig(TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
- const NKikimrSchemeOp::TDirEntry& selfInfo);
+ const NKikimrSchemeOp::TDirEntry& selfInfo) override;
};
class TRemoveReadRuleActor : public TUpdateSchemeActor<TRemoveReadRuleActor, TEvPQRemoveReadRuleRequest>
@@ -297,10 +297,10 @@ public:
TRemoveReadRuleActor(NKikimr::NGRpcService::TEvPQRemoveReadRuleRequest* request);
void Bootstrap(const NActors::TActorContext &ctx);
- void ModifyPersqueueConfig(const TActorContext& ctx,
+ void ModifyPersqueueConfig(TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
- const NKikimrSchemeOp::TDirEntry& selfInfo);
+ const NKikimrSchemeOp::TDirEntry& selfInfo) override;
};
@@ -374,13 +374,47 @@ public:
TAlterTopicActor(NKikimr::NGRpcService::IRequestOpCtx* request);
void Bootstrap(const NActors::TActorContext& ctx);
- void ModifyPersqueueConfig(const TActorContext& ctx,
+ void ModifyPersqueueConfig(TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
- const NKikimrSchemeOp::TDirEntry& selfInfo);
+ const NKikimrSchemeOp::TDirEntry& selfInfo) override;
};
+class TAlterTopicActorInternal : public TPQInternalSchemaActor<TAlterTopicActorInternal, NKikimr::NGRpcProxy::V1::TAlterTopicRequest,
+ TEvPQProxy::TEvAlterTopicResponse>
+ , public TUpdateSchemeActorBase<TAlterTopicActorInternal>
+ , public TCdcStreamCompatible
+{
+ using TUpdateSchemeBase = TUpdateSchemeActorBase<TAlterTopicActorInternal>;
+ using TRequest = NKikimr::NGRpcProxy::V1::TAlterTopicRequest;
+ using TActorBase = TPQInternalSchemaActor<TAlterTopicActorInternal, TRequest, TEvPQProxy::TEvAlterTopicResponse>;
+
+public:
+ TAlterTopicActorInternal(NKikimr::NGRpcProxy::V1::TAlterTopicRequest&& request,
+ NThreading::TPromise<TAlterTopicResponse>&& promise,
+ bool notExistsOk);
+
+ void Bootstrap(const NActors::TActorContext& ctx) override;
+ void ModifyPersqueueConfig(TAppData* appData,
+ NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
+ const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
+ const NKikimrSchemeOp::TDirEntry& selfInfo) override;
+
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override;
+
+ void StateWork(TAutoPtr<IEventHandle>& ev) {
+ TActorBase::StateWork(ev);
+ }
+
+protected:
+ bool RespondOverride(Ydb::StatusIds::StatusCode status, bool notFound) override;
+
+private:
+ NThreading::TPromise<TAlterTopicResponse> Promise;
+ bool MissingOk;
+};
+
class TPartitionsLocationActor : public TPQInternalSchemaActor<TPartitionsLocationActor,
TGetPartitionsLocationRequest,
TEvPQProxy::TEvPartitionLocationResponse>