aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@yandex-team.ru>2022-06-28 01:01:36 +0300
committeralexnick <alexnick@yandex-team.ru>2022-06-28 01:01:36 +0300
commitd035a83fdacc62590460e2589854afeba02055b9 (patch)
tree6ee44a51f87022d59450d5d60c738c69ae201c90
parent9fc927b5b83b5304ffedfaaf8472ed9e3973cca9 (diff)
downloadydb-d035a83fdacc62590460e2589854afeba02055b9.tar.gz
restrictions on supported codecs inside ydb LOGBROKER-7533
ref:3d370cafe7740d66ccab1b9bb80d74b79c3d4562
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp2
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp2
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.cpp27
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.h20
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.cpp2
-rw-r--r--ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp4
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp8
7 files changed, 53 insertions, 12 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp
index 8502ff25f9..1f662160b2 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp
@@ -12,7 +12,7 @@
namespace NYdb::NPersQueue {
const TVector<ECodec>& GetDefaultCodecs() {
- static const TVector<ECodec> codecs = {ECodec::RAW, ECodec::GZIP, ECodec::LZOP};
+ static const TVector<ECodec> codecs = {};
return codecs;
}
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index fba19c7d00..582e899e42 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -987,7 +987,7 @@ namespace NKikimr::NDataStreams::V1 {
TString error = AddReadRuleToConfig(pqConfig, readRule, serviceTypes, ctx);
bool hasDuplicates = false;
if (error.Empty()) {
- hasDuplicates = CheckReadRulesConfig(*pqConfig, serviceTypes, error);
+ hasDuplicates = CheckReadRulesConfig(*pqConfig, serviceTypes, error, ctx);
}
if (!error.Empty()) {
diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp
index dff7127e6f..307b555fdd 100644
--- a/ydb/services/lib/actors/pq_schema_actor.cpp
+++ b/ydb/services/lib/actors/pq_schema_actor.cpp
@@ -269,7 +269,7 @@ namespace NKikimr::NGRpcProxy::V1 {
bool CheckReadRulesConfig(const NKikimrPQ::TPQTabletConfig& config,
const TClientServiceTypes& supportedClientServiceTypes,
- TString& error) {
+ TString& error, const TActorContext& ctx) {
if (config.GetReadRules().size() > MAX_READ_RULES_COUNT) {
error = TStringBuilder() << "read rules count cannot be more than "
@@ -299,6 +299,25 @@ namespace NKikimr::NGRpcProxy::V1 {
return false;
}
}
+ if (config.GetCodecs().IdsSize() > 0) {
+ for (ui32 i = 0; i < config.ConsumerCodecsSize(); ++i) {
+ TString name = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ctx);
+
+ auto& consumerCodecs = config.GetConsumerCodecs(i);
+ if (consumerCodecs.IdsSize() > 0) {
+ THashSet<i64> codecs;
+ for (auto& cc : consumerCodecs.GetIds()) {
+ codecs.insert(cc);
+ }
+ for (auto& cc : config.GetCodecs().GetIds()) {
+ if (codecs.find(cc) == codecs.end()) {
+ error = TStringBuilder() << "for consumer '" << name << "' got unsupported codec " << (cc+1) << " which is suppored by topic";
+ return false;
+ }
+ }
+ }
+ }
+ }
return false;
}
@@ -712,7 +731,7 @@ namespace NKikimr::NGRpcProxy::V1 {
}
}
- CheckReadRulesConfig(*config, supportedClientServiceTypes, error);
+ CheckReadRulesConfig(*config, supportedClientServiceTypes, error, ctx);
return error.empty() ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::BAD_REQUEST;
}
@@ -852,7 +871,7 @@ namespace NKikimr::NGRpcProxy::V1 {
}
}
- CheckReadRulesConfig(*config, supportedClientServiceTypes, error);
+ CheckReadRulesConfig(*config, supportedClientServiceTypes, error, ctx);
return error.empty() ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::BAD_REQUEST;
}
@@ -1019,7 +1038,7 @@ namespace NKikimr::NGRpcProxy::V1 {
}
}
- bool hasDuplicates = CheckReadRulesConfig(*config, supportedClientServiceTypes, error);
+ bool hasDuplicates = CheckReadRulesConfig(*config, supportedClientServiceTypes, error, ctx);
return error.empty() ? Ydb::StatusIds::SUCCESS : (hasDuplicates ? Ydb::StatusIds::ALREADY_EXISTS : Ydb::StatusIds::BAD_REQUEST);
}
diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h
index 5cc8239c23..337c125bce 100644
--- a/ydb/services/lib/actors/pq_schema_actor.h
+++ b/ydb/services/lib/actors/pq_schema_actor.h
@@ -51,7 +51,7 @@ namespace NKikimr::NGRpcProxy::V1 {
TClientServiceTypes GetSupportedClientServiceTypes(const TActorContext& ctx);
// Returns true if have duplicated read rules
- bool CheckReadRulesConfig(const NKikimrPQ::TPQTabletConfig& config, const TClientServiceTypes& supportedReadRuleServiceTypes, TString& error);
+ bool CheckReadRulesConfig(const NKikimrPQ::TPQTabletConfig& config, const TClientServiceTypes& supportedReadRuleServiceTypes, TString& error, const TActorContext& ctx);
TString AddReadRuleToConfig(
NKikimrPQ::TPQTabletConfig *config,
@@ -361,9 +361,25 @@ namespace NKikimr::NGRpcProxy::V1 {
return this->SendProposeRequest(ctx);
}
+ void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const TActorContext& ctx) {
+ auto msg = ev->Get();
+ const auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(ev->Get()->Record.GetStatus());
+
+ if (status == TEvTxUserProxy::TResultStatus::ExecError && msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusPreconditionFailed)
+ {
+ return TBase::ReplyWithError(Ydb::StatusIds::OVERLOADED,
+ Ydb::PersQueue::ErrorCode::ERROR,
+ TStringBuilder() << "Topic with name " << TBase::GetTopicPath(ctx) << " has another alter in progress",
+ ctx);
+ }
+
+ return TBase::TBase::Handle(ev, ctx);
+ }
+
void StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
switch (ev->GetTypeRewrite()) {
- default: TBase::StateWork(ev, ctx);
+ HFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle);
+ default: TBase::StateWork(ev, ctx);
}
}
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp
index 8325614cd7..94955b96c7 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.cpp
+++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp
@@ -239,7 +239,7 @@ void TAddReadRuleActor::ModifyPersqueueConfig(
TString error = AddReadRuleToConfig(pqConfig, rule, serviceTypes, ctx);
bool hasDuplicates = false;
if (error.Empty()) {
- hasDuplicates = CheckReadRulesConfig(*pqConfig, serviceTypes, error);
+ hasDuplicates = CheckReadRulesConfig(*pqConfig, serviceTypes, error, ctx);
}
if (!error.Empty()) {
diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
index 2a4aed1fc7..1dfaa8a31e 100644
--- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
@@ -622,7 +622,7 @@ namespace NKikimr::NPersQueueTests {
rr->set_version(0);
rr->set_important(true);
rr->set_supported_format(TopicSettings::FORMAT_BASE);
- rr->add_supported_codecs(CODEC_ZSTD);
+ rr->add_supported_codecs(CODEC_GZIP);
auto status = stub->AddReadRule(&grpcContext, addRuleRequest, &addRuleResponse);
Cerr << "ADD RR RESPONSE " << addRuleResponse << "\n";
UNIT_ASSERT(status.ok() && addRuleResponse.operation().status() == Ydb::StatusIds::SUCCESS);
@@ -696,7 +696,7 @@ namespace NKikimr::NPersQueueTests {
rr->set_version(0);
rr->set_important(true);
rr->set_supported_format(TopicSettings::FORMAT_BASE);
- rr->add_supported_codecs(CODEC_ZSTD);
+ rr->add_supported_codecs(CODEC_GZIP);
auto status = stub->AddReadRule(&grpcContext, addRuleRequest, &addRuleResponse);
Cerr << addRuleResponse << "\n";
UNIT_ASSERT(status.ok() && addRuleResponse.operation().status() == Ydb::StatusIds::SUCCESS);
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 05c4064d13..22aba56762 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -3035,8 +3035,14 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
(*request.mutable_alter_attributes())["_allow_unauthenticated_read"] = "true";
(*request.mutable_alter_attributes())["_partitions_per_tablet"] = "5";
- alter(request, Ydb::StatusIds::SUCCESS, false);
+ rr->mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_LZOP);
+
+ alter(request, Ydb::StatusIds::BAD_REQUEST, false);
+
+ rr->mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_CUSTOM + 5);
+
+ alter(request, Ydb::StatusIds::SUCCESS, false);
request = Ydb::Topic::AlterTopicRequest{};
request.set_path(TStringBuilder() << "/Root/PQ/" << topic3);