diff options
author | alexnick <alexnick@yandex-team.ru> | 2022-06-28 01:01:36 +0300 |
---|---|---|
committer | alexnick <alexnick@yandex-team.ru> | 2022-06-28 01:01:36 +0300 |
commit | d035a83fdacc62590460e2589854afeba02055b9 (patch) | |
tree | 6ee44a51f87022d59450d5d60c738c69ae201c90 | |
parent | 9fc927b5b83b5304ffedfaaf8472ed9e3973cca9 (diff) | |
download | ydb-d035a83fdacc62590460e2589854afeba02055b9.tar.gz |
restrictions on supported codecs inside ydb LOGBROKER-7533
ref:3d370cafe7740d66ccab1b9bb80d74b79c3d4562
7 files changed, 53 insertions, 12 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp index 8502ff25f9..1f662160b2 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp @@ -12,7 +12,7 @@ namespace NYdb::NPersQueue { const TVector<ECodec>& GetDefaultCodecs() { - static const TVector<ECodec> codecs = {ECodec::RAW, ECodec::GZIP, ECodec::LZOP}; + static const TVector<ECodec> codecs = {}; return codecs; } diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index fba19c7d00..582e899e42 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -987,7 +987,7 @@ namespace NKikimr::NDataStreams::V1 { TString error = AddReadRuleToConfig(pqConfig, readRule, serviceTypes, ctx); bool hasDuplicates = false; if (error.Empty()) { - hasDuplicates = CheckReadRulesConfig(*pqConfig, serviceTypes, error); + hasDuplicates = CheckReadRulesConfig(*pqConfig, serviceTypes, error, ctx); } if (!error.Empty()) { diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index dff7127e6f..307b555fdd 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -269,7 +269,7 @@ namespace NKikimr::NGRpcProxy::V1 { bool CheckReadRulesConfig(const NKikimrPQ::TPQTabletConfig& config, const TClientServiceTypes& supportedClientServiceTypes, - TString& error) { + TString& error, const TActorContext& ctx) { if (config.GetReadRules().size() > MAX_READ_RULES_COUNT) { error = TStringBuilder() << "read rules count cannot be more than " @@ -299,6 +299,25 @@ namespace NKikimr::NGRpcProxy::V1 { return false; } } + if (config.GetCodecs().IdsSize() > 0) { + for (ui32 i = 0; i < config.ConsumerCodecsSize(); ++i) { + TString name = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ctx); + + auto& consumerCodecs = config.GetConsumerCodecs(i); + if (consumerCodecs.IdsSize() > 0) { + THashSet<i64> codecs; + for (auto& cc : consumerCodecs.GetIds()) { + codecs.insert(cc); + } + for (auto& cc : config.GetCodecs().GetIds()) { + if (codecs.find(cc) == codecs.end()) { + error = TStringBuilder() << "for consumer '" << name << "' got unsupported codec " << (cc+1) << " which is suppored by topic"; + return false; + } + } + } + } + } return false; } @@ -712,7 +731,7 @@ namespace NKikimr::NGRpcProxy::V1 { } } - CheckReadRulesConfig(*config, supportedClientServiceTypes, error); + CheckReadRulesConfig(*config, supportedClientServiceTypes, error, ctx); return error.empty() ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::BAD_REQUEST; } @@ -852,7 +871,7 @@ namespace NKikimr::NGRpcProxy::V1 { } } - CheckReadRulesConfig(*config, supportedClientServiceTypes, error); + CheckReadRulesConfig(*config, supportedClientServiceTypes, error, ctx); return error.empty() ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::BAD_REQUEST; } @@ -1019,7 +1038,7 @@ namespace NKikimr::NGRpcProxy::V1 { } } - bool hasDuplicates = CheckReadRulesConfig(*config, supportedClientServiceTypes, error); + bool hasDuplicates = CheckReadRulesConfig(*config, supportedClientServiceTypes, error, ctx); return error.empty() ? Ydb::StatusIds::SUCCESS : (hasDuplicates ? Ydb::StatusIds::ALREADY_EXISTS : Ydb::StatusIds::BAD_REQUEST); } diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h index 5cc8239c23..337c125bce 100644 --- a/ydb/services/lib/actors/pq_schema_actor.h +++ b/ydb/services/lib/actors/pq_schema_actor.h @@ -51,7 +51,7 @@ namespace NKikimr::NGRpcProxy::V1 { TClientServiceTypes GetSupportedClientServiceTypes(const TActorContext& ctx); // Returns true if have duplicated read rules - bool CheckReadRulesConfig(const NKikimrPQ::TPQTabletConfig& config, const TClientServiceTypes& supportedReadRuleServiceTypes, TString& error); + bool CheckReadRulesConfig(const NKikimrPQ::TPQTabletConfig& config, const TClientServiceTypes& supportedReadRuleServiceTypes, TString& error, const TActorContext& ctx); TString AddReadRuleToConfig( NKikimrPQ::TPQTabletConfig *config, @@ -361,9 +361,25 @@ namespace NKikimr::NGRpcProxy::V1 { return this->SendProposeRequest(ctx); } + void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const TActorContext& ctx) { + auto msg = ev->Get(); + const auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(ev->Get()->Record.GetStatus()); + + if (status == TEvTxUserProxy::TResultStatus::ExecError && msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusPreconditionFailed) + { + return TBase::ReplyWithError(Ydb::StatusIds::OVERLOADED, + Ydb::PersQueue::ErrorCode::ERROR, + TStringBuilder() << "Topic with name " << TBase::GetTopicPath(ctx) << " has another alter in progress", + ctx); + } + + return TBase::TBase::Handle(ev, ctx); + } + void StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { switch (ev->GetTypeRewrite()) { - default: TBase::StateWork(ev, ctx); + HFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle); + default: TBase::StateWork(ev, ctx); } } diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 8325614cd7..94955b96c7 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -239,7 +239,7 @@ void TAddReadRuleActor::ModifyPersqueueConfig( TString error = AddReadRuleToConfig(pqConfig, rule, serviceTypes, ctx); bool hasDuplicates = false; if (error.Empty()) { - hasDuplicates = CheckReadRulesConfig(*pqConfig, serviceTypes, error); + hasDuplicates = CheckReadRulesConfig(*pqConfig, serviceTypes, error, ctx); } if (!error.Empty()) { diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp index 2a4aed1fc7..1dfaa8a31e 100644 --- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp @@ -622,7 +622,7 @@ namespace NKikimr::NPersQueueTests { rr->set_version(0); rr->set_important(true); rr->set_supported_format(TopicSettings::FORMAT_BASE); - rr->add_supported_codecs(CODEC_ZSTD); + rr->add_supported_codecs(CODEC_GZIP); auto status = stub->AddReadRule(&grpcContext, addRuleRequest, &addRuleResponse); Cerr << "ADD RR RESPONSE " << addRuleResponse << "\n"; UNIT_ASSERT(status.ok() && addRuleResponse.operation().status() == Ydb::StatusIds::SUCCESS); @@ -696,7 +696,7 @@ namespace NKikimr::NPersQueueTests { rr->set_version(0); rr->set_important(true); rr->set_supported_format(TopicSettings::FORMAT_BASE); - rr->add_supported_codecs(CODEC_ZSTD); + rr->add_supported_codecs(CODEC_GZIP); auto status = stub->AddReadRule(&grpcContext, addRuleRequest, &addRuleResponse); Cerr << addRuleResponse << "\n"; UNIT_ASSERT(status.ok() && addRuleResponse.operation().status() == Ydb::StatusIds::SUCCESS); diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 05c4064d13..22aba56762 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -3035,8 +3035,14 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { (*request.mutable_alter_attributes())["_allow_unauthenticated_read"] = "true"; (*request.mutable_alter_attributes())["_partitions_per_tablet"] = "5"; - alter(request, Ydb::StatusIds::SUCCESS, false); + rr->mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_LZOP); + + alter(request, Ydb::StatusIds::BAD_REQUEST, false); + + rr->mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_CUSTOM + 5); + + alter(request, Ydb::StatusIds::SUCCESS, false); request = Ydb::Topic::AlterTopicRequest{}; request.set_path(TStringBuilder() << "/Root/PQ/" << topic3); |