aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-02-12 17:40:47 +0300
committerIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-02-12 17:40:47 +0300
commit42b3d6aa13ee009b67f2178dab9fe177a0fc96dd (patch)
treef77b711395a026290ce6eabb7fa48ca83b8093f3
parentee42a6e9701086b2a04cf4f4c4d29a07d99b884f (diff)
downloadydb-42b3d6aa13ee009b67f2178dab9fe177a0fc96dd.tar.gz
Check 'Created' flag for pq KIKIMR-14309
ref:6bb4cfd50fcbcce2807926fa98c32edca288cfef
-rw-r--r--ydb/core/client/server/msgbus_server_persqueue.cpp2
-rw-r--r--ydb/core/tx/scheme_board/cache.cpp4
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.h9
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_read_actor.cpp2
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_write_actor.cpp8
5 files changed, 22 insertions, 3 deletions
diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp
index 78b83ef3a3..d14fd75ebe 100644
--- a/ydb/core/client/server/msgbus_server_persqueue.cpp
+++ b/ydb/core/client/server/msgbus_server_persqueue.cpp
@@ -322,7 +322,7 @@ bool TPersQueueBaseRequestProcessor::ReadyToCreateChildren() const {
bool TPersQueueBaseRequestProcessor::CreateChildren(const TActorContext& ctx) {
for (const auto& child : SchemeCacheResponse->ResultSet) {
- if (child.Kind == TSchemeCacheNavigate::EKind::KindTopic) {
+ if (child.Kind == TSchemeCacheNavigate::EKind::KindTopic && child.PQGroupInfo) {
TString name = child.PQGroupInfo->Description.GetName();
if (name.empty()) {
name = child.Path.back();
diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp
index 8e778a4a1d..708f62362f 100644
--- a/ydb/core/tx/scheme_board/cache.cpp
+++ b/ydb/core/tx/scheme_board/cache.cpp
@@ -1400,7 +1400,9 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
case NKikimrSchemeOp::EPathTypePersQueueGroup:
Kind = TNavigate::KindTopic;
IsPrivatePath = CalcPathIsPrivate(entryDesc.GetPathType(), entryDesc.GetPathSubType());
- FillInfo(Kind, PQGroupInfo, std::move(*pathDesc.MutablePersQueueGroup()));
+ if (Created) {
+ FillInfo(Kind, PQGroupInfo, std::move(*pathDesc.MutablePersQueueGroup()));
+ }
break;
case NKikimrSchemeOp::EPathTypeCdcStream:
Kind = TNavigate::KindCdcStream;
diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h
index 1be2e7fa24..b056a05e64 100644
--- a/ydb/services/lib/actors/pq_schema_actor.h
+++ b/ydb/services/lib/actors/pq_schema_actor.h
@@ -157,6 +157,15 @@ namespace NKikimr::NGRpcProxy::V1 {
switch (response.Status) {
case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok: {
+ if (!result->ResultSet.front().PQGroupInfo) {
+ this->Request_->RaiseIssue(
+ FillIssue(
+ TStringBuilder() << "path '" << path << "' creation is not completed",
+ Ydb::PersQueue::ErrorCode::ERROR
+ )
+ );
+ return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
+ }
return static_cast<TDerived*>(this)->HandleCacheNavigateResponse(ev, ctx);
}
break;
diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
index 654edfcfcd..f0b031051c 100644
--- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
@@ -2634,8 +2634,8 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse(
const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, THashMap<TString, TTopicHolder>::iterator topicsIter,
const TActorContext& ctx
) {
+ Y_VERIFY(entry.PQGroupInfo); // checked at ProcessMetaCacheTopicResponse()
auto& pqDescr = entry.PQGroupInfo->Description;
- Y_VERIFY(entry.PQGroupInfo);
topicsIter->second.TabletID = pqDescr.GetBalancerTabletID();
return CheckTopicACL(entry, topicsIter->first, ctx);
}
diff --git a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
index 3038cc82a6..86f101df9b 100644
--- a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
@@ -416,6 +416,7 @@ void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActo
CloseSession(processResult.Reason, processResult.ErrorCode, ctx);
return;
}
+ Y_VERIFY(entry.PQGroupInfo); // checked at ProcessMetaCacheTopicResponse()
auto& description = entry.PQGroupInfo->Description;
Y_VERIFY(description.PartitionsSize() > 0);
Y_VERIFY(description.HasPQTabletConfig());
@@ -470,6 +471,13 @@ void TWriteSessionActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::
PersQueue::ErrorCode::ERROR, ctx
);
}
+ if (!navigate->ResultSet.front().PQGroupInfo) {
+ return CloseSession(
+ TStringBuilder() << "topic '" << TopicConverter->GetClientsideName() << "' describe error"
+ << ", reason: could not retrieve topic description",
+ PersQueue::ErrorCode::ERROR, ctx
+ );
+ }
const auto& pqDescription = navigate->ResultSet.front().PQGroupInfo->Description;