aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-06-06 14:51:40 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-06-06 14:51:40 +0300
commit94a5b9b2d4bb884f5a964c47268f2821a04a334c (patch)
tree411d0ffffb50d54f1692bfef9d9a0c7138d08fdd
parent4b0f8c16ff71970dccb14363516742466369853c (diff)
downloadydb-94a5b9b2d4bb884f5a964c47268f2821a04a334c.tar.gz
merge to 22-2 for logbroker : r9399702, r9399224
fix for not fully created topic or double topics in request KIKIMR-14801 mirrorer: recreate read session instead verify REVIEW: 2505170 x-ydb-stable-ref: cbaa41b7596669f63ab212bae8f82d4d26d1c072
-rw-r--r--contrib/libs/tcmalloc/no_percpu_cache/ya.make0
-rw-r--r--ydb/core/client/server/msgbus_server_persqueue.cpp16
-rw-r--r--ydb/core/persqueue/mirrorer.cpp51
-rw-r--r--ydb/core/persqueue/mirrorer.h3
4 files changed, 45 insertions, 25 deletions
diff --git a/contrib/libs/tcmalloc/no_percpu_cache/ya.make b/contrib/libs/tcmalloc/no_percpu_cache/ya.make
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/contrib/libs/tcmalloc/no_percpu_cache/ya.make
diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp
index ac3487369f..627d37b5dd 100644
--- a/ydb/core/client/server/msgbus_server_persqueue.cpp
+++ b/ydb/core/client/server/msgbus_server_persqueue.cpp
@@ -324,10 +324,7 @@ bool TPersQueueBaseRequestProcessor::CreateChildren(const TActorContext& ctx) {
for (const auto& child : SchemeCacheResponse->ResultSet) {
if (child.Kind == TSchemeCacheNavigate::EKind::KindTopic && child.PQGroupInfo) {
TString name = child.PQGroupInfo->Description.GetName();
- if (name.empty()) {
- name = child.Path.back();
- }
- if (!TopicsToRequest.empty() && !IsIn(TopicsToRequest, name)) {
+ if (name.empty() || !TopicsToRequest.empty() && !IsIn(TopicsToRequest, name)) {
continue;
}
ChildrenToCreate.emplace_back(new TPerTopicInfo(child));
@@ -358,6 +355,17 @@ bool TPersQueueBaseRequestProcessor::CreateChildrenIfNeeded(const TActorContext&
THolder<TPerTopicInfo> perTopicInfo(ChildrenToCreate.front().Release());
ChildrenToCreate.pop_front();
const auto& name = perTopicInfo->TopicEntry.PQGroupInfo->Description.GetName();
+ if (name.empty()) {
+ continue;
+ }
+
+ if (topics.find(name) != topics.end()) {
+ LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "already present topic '" << name << "'");
+ SendErrorReplyAndDie(ctx, MSTATUS_ERROR, NPersQueue::NErrorCode::UNKNOWN_TOPIC,
+ TStringBuilder() << "already present topic '" << name << "' Marker# PQ95");
+ return true;
+ }
+
THolder<IActor> childActor = CreateTopicSubactor(perTopicInfo->TopicEntry, name);
if (childActor.Get() != nullptr) {
const TActorId actorId = ctx.Register(childActor.Release());
diff --git a/ydb/core/persqueue/mirrorer.cpp b/ydb/core/persqueue/mirrorer.cpp
index 2847478044..c576dffb9e 100644
--- a/ydb/core/persqueue/mirrorer.cpp
+++ b/ydb/core/persqueue/mirrorer.cpp
@@ -97,10 +97,14 @@ void TMirrorer::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx)
bool TMirrorer::AddToWriteRequest(
NKikimrClient::TPersQueuePartitionRequest& request,
- TPersQueueReadEvent::TDataReceivedEvent::TCompressedMessage& message
+ TPersQueueReadEvent::TDataReceivedEvent::TCompressedMessage& message,
+ bool& incorrectRequest
) {
if (!request.HasCmdWriteOffset()) {
- Y_VERIFY(request.CmdWriteSize() == 0);
+ if (request.CmdWriteSize() > 0) {
+ incorrectRequest = true;
+ return false;
+ }
request.SetCmdWriteOffset(message.GetOffset(0));
}
if (request.GetCmdWriteOffset() + request.CmdWriteSize() != message.GetOffset(0)) {
@@ -326,10 +330,17 @@ void TMirrorer::TryToWrite(const TActorContext& ctx) {
req->SetMessageNo(0);
req->SetCookie(WRITE_REQUEST_COOKIE);
- while (!Queue.empty() && AddToWriteRequest(*req, Queue.front())) {
+ bool incorrectRequest = false;
+ while (!Queue.empty() && AddToWriteRequest(*req, Queue.front(), incorrectRequest)) {
WriteInFlight.emplace_back(std::move(Queue.front()));
Queue.pop_front();
}
+ if (incorrectRequest) {
+ ProcessError(ctx, TStringBuilder() << " incorrect filling of CmdWrite request: "
+ << req->DebugString());
+ ScheduleConsumerCreation(ctx);
+ return;
+ }
WriteRequestInFlight = request->Record.GetPartitionRequest();
@@ -537,27 +548,28 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup)
if (auto* dataEvent = std::get_if<TPersQueueReadEvent::TDataReceivedEvent>(&event.GetRef())) {
AddMessagesToQueue(std::move(dataEvent->GetCompressedMessages()));
} else if (auto* createStream = std::get_if<TPersQueueReadEvent::TCreatePartitionStreamEvent>(&event.GetRef())) {
- Y_VERIFY_S(
- !PartitionStream,
- MirrorerDescription()
- << " already has stream " << PartitionStream->GetPartitionStreamId()
- << ", new stream " << createStream->GetPartitionStream()->GetPartitionStreamId()
- );
+ LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER,
+ MirrorerDescription() << " got create stream event for '" << createStream->DebugString()
+ << " and will set offset=" << OffsetToRead);
+ if (PartitionStream) {
+ ProcessError(ctx, TStringBuilder() << " already has stream " << PartitionStream->GetPartitionStreamId()
+ << ", new stream " << createStream->GetPartitionStream()->GetPartitionStreamId());
+ ScheduleConsumerCreation(ctx);
+ return;
+ }
+
PartitionStream = createStream->GetPartitionStream();
- Y_VERIFY_S(
- Partition == PartitionStream->GetPartitionId(),
- MirrorerDescription()
- << " got stream for incorrect partition, stream: topic=" << PartitionStream->GetTopicPath()
- << " partition=" << PartitionStream->GetPartitionId()
- );
+ if (Partition != PartitionStream->GetPartitionId()) {
+ ProcessError(ctx, TStringBuilder() << " got stream for incorrect partition, stream: topic=" << PartitionStream->GetTopicPath()
+ << " partition=" << PartitionStream->GetPartitionId());
+ ScheduleConsumerCreation(ctx);
+ return;
+ }
if (OffsetToRead < createStream->GetCommittedOffset()) {
ProcessError(ctx, TStringBuilder() << "stream has commit offset more then partition end offset,"
<< "gap will be created [" << OffsetToRead << ";" << createStream->GetCommittedOffset() << ")"
);
}
- LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER,
- MirrorerDescription() << " got create stream event for '" << createStream->DebugString()
- << " and will set offset=" << OffsetToRead);
createStream->Confirm(OffsetToRead, OffsetToRead);
RequestSourcePartitionStatus();
@@ -567,8 +579,7 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup)
PartitionStream.Reset();
LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER,
MirrorerDescription()
- << " got destroy stream event for partition stream id: "
- << destroyStream->GetPartitionStream()->GetPartitionStreamId());
+ << " got destroy stream event: " << destroyStream->DebugString());
} else if (auto* streamClosed = std::get_if<TPersQueueReadEvent::TPartitionStreamClosedEvent>(&event.GetRef())) {
PartitionStream.Reset();
LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER,
diff --git a/ydb/core/persqueue/mirrorer.h b/ydb/core/persqueue/mirrorer.h
index ddcbe38f42..cac6451dec 100644
--- a/ydb/core/persqueue/mirrorer.h
+++ b/ydb/core/persqueue/mirrorer.h
@@ -94,7 +94,8 @@ private:
bool AddToWriteRequest(
NKikimrClient::TPersQueuePartitionRequest& request,
- NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message
+ NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message,
+ bool& incorrectRequest
);
void ProcessError(const TActorContext& ctx, const TString& msg);
void ProcessError(const TActorContext& ctx, const TString& msg, const NKikimrClient::TResponse& response);