diff options
author | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-06-06 14:51:40 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-06-06 14:51:40 +0300 |
commit | 94a5b9b2d4bb884f5a964c47268f2821a04a334c (patch) | |
tree | 411d0ffffb50d54f1692bfef9d9a0c7138d08fdd | |
parent | 4b0f8c16ff71970dccb14363516742466369853c (diff) | |
download | ydb-94a5b9b2d4bb884f5a964c47268f2821a04a334c.tar.gz |
merge to 22-2 for logbroker : r9399702, r9399224
fix for not fully created topic or double topics in request KIKIMR-14801
mirrorer: recreate read session instead verify
REVIEW: 2505170
x-ydb-stable-ref: cbaa41b7596669f63ab212bae8f82d4d26d1c072
-rw-r--r-- | contrib/libs/tcmalloc/no_percpu_cache/ya.make | 0 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_persqueue.cpp | 16 | ||||
-rw-r--r-- | ydb/core/persqueue/mirrorer.cpp | 51 | ||||
-rw-r--r-- | ydb/core/persqueue/mirrorer.h | 3 |
4 files changed, 45 insertions, 25 deletions
diff --git a/contrib/libs/tcmalloc/no_percpu_cache/ya.make b/contrib/libs/tcmalloc/no_percpu_cache/ya.make new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/libs/tcmalloc/no_percpu_cache/ya.make diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp index ac3487369f..627d37b5dd 100644 --- a/ydb/core/client/server/msgbus_server_persqueue.cpp +++ b/ydb/core/client/server/msgbus_server_persqueue.cpp @@ -324,10 +324,7 @@ bool TPersQueueBaseRequestProcessor::CreateChildren(const TActorContext& ctx) { for (const auto& child : SchemeCacheResponse->ResultSet) { if (child.Kind == TSchemeCacheNavigate::EKind::KindTopic && child.PQGroupInfo) { TString name = child.PQGroupInfo->Description.GetName(); - if (name.empty()) { - name = child.Path.back(); - } - if (!TopicsToRequest.empty() && !IsIn(TopicsToRequest, name)) { + if (name.empty() || !TopicsToRequest.empty() && !IsIn(TopicsToRequest, name)) { continue; } ChildrenToCreate.emplace_back(new TPerTopicInfo(child)); @@ -358,6 +355,17 @@ bool TPersQueueBaseRequestProcessor::CreateChildrenIfNeeded(const TActorContext& THolder<TPerTopicInfo> perTopicInfo(ChildrenToCreate.front().Release()); ChildrenToCreate.pop_front(); const auto& name = perTopicInfo->TopicEntry.PQGroupInfo->Description.GetName(); + if (name.empty()) { + continue; + } + + if (topics.find(name) != topics.end()) { + LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "already present topic '" << name << "'"); + SendErrorReplyAndDie(ctx, MSTATUS_ERROR, NPersQueue::NErrorCode::UNKNOWN_TOPIC, + TStringBuilder() << "already present topic '" << name << "' Marker# PQ95"); + return true; + } + THolder<IActor> childActor = CreateTopicSubactor(perTopicInfo->TopicEntry, name); if (childActor.Get() != nullptr) { const TActorId actorId = ctx.Register(childActor.Release()); diff --git a/ydb/core/persqueue/mirrorer.cpp b/ydb/core/persqueue/mirrorer.cpp index 2847478044..c576dffb9e 100644 --- a/ydb/core/persqueue/mirrorer.cpp +++ b/ydb/core/persqueue/mirrorer.cpp @@ -97,10 +97,14 @@ void TMirrorer::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) bool TMirrorer::AddToWriteRequest( NKikimrClient::TPersQueuePartitionRequest& request, - TPersQueueReadEvent::TDataReceivedEvent::TCompressedMessage& message + TPersQueueReadEvent::TDataReceivedEvent::TCompressedMessage& message, + bool& incorrectRequest ) { if (!request.HasCmdWriteOffset()) { - Y_VERIFY(request.CmdWriteSize() == 0); + if (request.CmdWriteSize() > 0) { + incorrectRequest = true; + return false; + } request.SetCmdWriteOffset(message.GetOffset(0)); } if (request.GetCmdWriteOffset() + request.CmdWriteSize() != message.GetOffset(0)) { @@ -326,10 +330,17 @@ void TMirrorer::TryToWrite(const TActorContext& ctx) { req->SetMessageNo(0); req->SetCookie(WRITE_REQUEST_COOKIE); - while (!Queue.empty() && AddToWriteRequest(*req, Queue.front())) { + bool incorrectRequest = false; + while (!Queue.empty() && AddToWriteRequest(*req, Queue.front(), incorrectRequest)) { WriteInFlight.emplace_back(std::move(Queue.front())); Queue.pop_front(); } + if (incorrectRequest) { + ProcessError(ctx, TStringBuilder() << " incorrect filling of CmdWrite request: " + << req->DebugString()); + ScheduleConsumerCreation(ctx); + return; + } WriteRequestInFlight = request->Record.GetPartitionRequest(); @@ -537,27 +548,28 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup) if (auto* dataEvent = std::get_if<TPersQueueReadEvent::TDataReceivedEvent>(&event.GetRef())) { AddMessagesToQueue(std::move(dataEvent->GetCompressedMessages())); } else if (auto* createStream = std::get_if<TPersQueueReadEvent::TCreatePartitionStreamEvent>(&event.GetRef())) { - Y_VERIFY_S( - !PartitionStream, - MirrorerDescription() - << " already has stream " << PartitionStream->GetPartitionStreamId() - << ", new stream " << createStream->GetPartitionStream()->GetPartitionStreamId() - ); + LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, + MirrorerDescription() << " got create stream event for '" << createStream->DebugString() + << " and will set offset=" << OffsetToRead); + if (PartitionStream) { + ProcessError(ctx, TStringBuilder() << " already has stream " << PartitionStream->GetPartitionStreamId() + << ", new stream " << createStream->GetPartitionStream()->GetPartitionStreamId()); + ScheduleConsumerCreation(ctx); + return; + } + PartitionStream = createStream->GetPartitionStream(); - Y_VERIFY_S( - Partition == PartitionStream->GetPartitionId(), - MirrorerDescription() - << " got stream for incorrect partition, stream: topic=" << PartitionStream->GetTopicPath() - << " partition=" << PartitionStream->GetPartitionId() - ); + if (Partition != PartitionStream->GetPartitionId()) { + ProcessError(ctx, TStringBuilder() << " got stream for incorrect partition, stream: topic=" << PartitionStream->GetTopicPath() + << " partition=" << PartitionStream->GetPartitionId()); + ScheduleConsumerCreation(ctx); + return; + } if (OffsetToRead < createStream->GetCommittedOffset()) { ProcessError(ctx, TStringBuilder() << "stream has commit offset more then partition end offset," << "gap will be created [" << OffsetToRead << ";" << createStream->GetCommittedOffset() << ")" ); } - LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, - MirrorerDescription() << " got create stream event for '" << createStream->DebugString() - << " and will set offset=" << OffsetToRead); createStream->Confirm(OffsetToRead, OffsetToRead); RequestSourcePartitionStatus(); @@ -567,8 +579,7 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup) PartitionStream.Reset(); LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() - << " got destroy stream event for partition stream id: " - << destroyStream->GetPartitionStream()->GetPartitionStreamId()); + << " got destroy stream event: " << destroyStream->DebugString()); } else if (auto* streamClosed = std::get_if<TPersQueueReadEvent::TPartitionStreamClosedEvent>(&event.GetRef())) { PartitionStream.Reset(); LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, diff --git a/ydb/core/persqueue/mirrorer.h b/ydb/core/persqueue/mirrorer.h index ddcbe38f42..cac6451dec 100644 --- a/ydb/core/persqueue/mirrorer.h +++ b/ydb/core/persqueue/mirrorer.h @@ -94,7 +94,8 @@ private: bool AddToWriteRequest( NKikimrClient::TPersQueuePartitionRequest& request, - NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message + NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message, + bool& incorrectRequest ); void ProcessError(const TActorContext& ctx, const TString& msg); void ProcessError(const TActorContext& ctx, const TString& msg, const NKikimrClient::TResponse& response); |