diff options
author | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-02-12 17:40:47 +0300 |
---|---|---|
committer | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-02-12 17:40:47 +0300 |
commit | 42b3d6aa13ee009b67f2178dab9fe177a0fc96dd (patch) | |
tree | f77b711395a026290ce6eabb7fa48ca83b8093f3 | |
parent | ee42a6e9701086b2a04cf4f4c4d29a07d99b884f (diff) | |
download | ydb-42b3d6aa13ee009b67f2178dab9fe177a0fc96dd.tar.gz |
Check 'Created' flag for pq KIKIMR-14309
ref:6bb4cfd50fcbcce2807926fa98c32edca288cfef
-rw-r--r-- | ydb/core/client/server/msgbus_server_persqueue.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/scheme_board/cache.cpp | 4 | ||||
-rw-r--r-- | ydb/services/lib/actors/pq_schema_actor.h | 9 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_read_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_write_actor.cpp | 8 |
5 files changed, 22 insertions, 3 deletions
diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp index 78b83ef3a3..d14fd75ebe 100644 --- a/ydb/core/client/server/msgbus_server_persqueue.cpp +++ b/ydb/core/client/server/msgbus_server_persqueue.cpp @@ -322,7 +322,7 @@ bool TPersQueueBaseRequestProcessor::ReadyToCreateChildren() const { bool TPersQueueBaseRequestProcessor::CreateChildren(const TActorContext& ctx) { for (const auto& child : SchemeCacheResponse->ResultSet) { - if (child.Kind == TSchemeCacheNavigate::EKind::KindTopic) { + if (child.Kind == TSchemeCacheNavigate::EKind::KindTopic && child.PQGroupInfo) { TString name = child.PQGroupInfo->Description.GetName(); if (name.empty()) { name = child.Path.back(); diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp index 8e778a4a1d..708f62362f 100644 --- a/ydb/core/tx/scheme_board/cache.cpp +++ b/ydb/core/tx/scheme_board/cache.cpp @@ -1400,7 +1400,9 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { case NKikimrSchemeOp::EPathTypePersQueueGroup: Kind = TNavigate::KindTopic; IsPrivatePath = CalcPathIsPrivate(entryDesc.GetPathType(), entryDesc.GetPathSubType()); - FillInfo(Kind, PQGroupInfo, std::move(*pathDesc.MutablePersQueueGroup())); + if (Created) { + FillInfo(Kind, PQGroupInfo, std::move(*pathDesc.MutablePersQueueGroup())); + } break; case NKikimrSchemeOp::EPathTypeCdcStream: Kind = TNavigate::KindCdcStream; diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h index 1be2e7fa24..b056a05e64 100644 --- a/ydb/services/lib/actors/pq_schema_actor.h +++ b/ydb/services/lib/actors/pq_schema_actor.h @@ -157,6 +157,15 @@ namespace NKikimr::NGRpcProxy::V1 { switch (response.Status) { case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok: { + if (!result->ResultSet.front().PQGroupInfo) { + this->Request_->RaiseIssue( + FillIssue( + TStringBuilder() << "path '" << path << "' creation is not completed", + Ydb::PersQueue::ErrorCode::ERROR + ) + ); + return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + } return static_cast<TDerived*>(this)->HandleCacheNavigateResponse(ev, ctx); } break; diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp index 654edfcfcd..f0b031051c 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp @@ -2634,8 +2634,8 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse( const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, THashMap<TString, TTopicHolder>::iterator topicsIter, const TActorContext& ctx ) { + Y_VERIFY(entry.PQGroupInfo); // checked at ProcessMetaCacheTopicResponse() auto& pqDescr = entry.PQGroupInfo->Description; - Y_VERIFY(entry.PQGroupInfo); topicsIter->second.TabletID = pqDescr.GetBalancerTabletID(); return CheckTopicACL(entry, topicsIter->first, ctx); } diff --git a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp index 3038cc82a6..86f101df9b 100644 --- a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp @@ -416,6 +416,7 @@ void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActo CloseSession(processResult.Reason, processResult.ErrorCode, ctx); return; } + Y_VERIFY(entry.PQGroupInfo); // checked at ProcessMetaCacheTopicResponse() auto& description = entry.PQGroupInfo->Description; Y_VERIFY(description.PartitionsSize() > 0); Y_VERIFY(description.HasPQTabletConfig()); @@ -470,6 +471,13 @@ void TWriteSessionActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult:: PersQueue::ErrorCode::ERROR, ctx ); } + if (!navigate->ResultSet.front().PQGroupInfo) { + return CloseSession( + TStringBuilder() << "topic '" << TopicConverter->GetClientsideName() << "' describe error" + << ", reason: could not retrieve topic description", + PersQueue::ErrorCode::ERROR, ctx + ); + } const auto& pqDescription = navigate->ResultSet.front().PQGroupInfo->Description; |