diff options
author | tesseract <tesseract@yandex-team.com> | 2023-11-01 09:00:56 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-11-01 09:21:41 +0300 |
commit | b20aa0ca2385dad33e357649643a944c2f678557 (patch) | |
tree | 3bddfb417ed6830692251c40abe89aae42d86441 | |
parent | 7c7f3222f32903c37ee4c5790635e9e03f687417 (diff) | |
download | ydb-b20aa0ca2385dad33e357649643a944c2f678557.tar.gz |
Split the method to parts which processing one message type
-rw-r--r-- | ydb/core/persqueue/partition.h | 21 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_write.cpp | 164 |
2 files changed, 116 insertions, 69 deletions
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index e59a26c9a7..9709d6b973 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -525,6 +525,27 @@ private: } private: + struct ProcessParameters { + ProcessParameters(TSourceIdWriter& sourceIdWriter, + THeartbeatEmitter& heartbeatEmitter) + : SourceIdWriter(sourceIdWriter) + , HeartbeatEmitter(heartbeatEmitter) { + } + + TSourceIdWriter& SourceIdWriter; + THeartbeatEmitter& HeartbeatEmitter; + + ui64 CurOffset; + bool OldPartsCleared; + bool HeadCleared; + }; + + bool ProcessRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters); + bool ProcessRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters); + bool ProcessRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters); + bool ProcessRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request, const TActorContext& ctx); + +private: ui64 TabletID; ui32 Partition; NKikimrPQ::TPQTabletConfig Config; diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 7bac53df95..1b44e31cb2 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -800,75 +800,48 @@ void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::T WriteCycleSize = 0; } -bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, - TSourceIdWriter& sourceIdWriter, THeartbeatEmitter& heartbeatEmitter) -{ - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AppendHeadWithNewWrites. Partition: " << Partition); - - ui64 curOffset = PartitionedBlob.IsInited() ? PartitionedBlob.GetOffset() : EndOffset; - - WriteCycleSize = 0; - WriteNewSize = 0; - WriteNewSizeUncompressed = 0; - WriteNewMessages = 0; - UpdateWriteBufferIsFullState(ctx.Now()); - CurrentTimestamp = ctx.Now(); - - NewHead.Offset = EndOffset; - NewHead.PartNo = 0; - NewHead.PackedSize = 0; - - Y_ABORT_UNLESS(NewHead.Batches.empty()); - - bool oldPartsCleared = false; - bool headCleared = (Head.PackedSize == 0); +bool TPartition::ProcessRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters) { + auto& body = msg.Body; + TMaybe<TPartitionKeyRange> keyRange; + if (body.KeyRange) { + keyRange = TPartitionKeyRange::Parse(*body.KeyRange); + } - //TODO: Process here not TClientBlobs, but also TBatches from LB(LB got them from pushclient too) - //Process is following: if batch contains already written messages or only one client message part -> unpack it and process as several TClientBlobs - //otherwise write this batch as is to head; + body.AssignedOffset = parameters.CurOffset; + parameters.SourceIdWriter.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange)); - while (!Requests.empty() && WriteCycleSize < MAX_WRITE_CYCLE_SIZE) { //head is not too big - auto pp = Requests.front(); - Requests.pop_front(); + return true; +} - if (!pp.IsWrite()) { - if (pp.IsRegisterMessageGroup()) { - auto& body = pp.GetRegisterMessageGroup().Body; +bool TPartition::ProcessRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters) { + parameters.SourceIdWriter.DeregisterSourceId(msg.Body.SourceId); - TMaybe<TPartitionKeyRange> keyRange; - if (body.KeyRange) { - keyRange = TPartitionKeyRange::Parse(*body.KeyRange); - } + return true; +} - body.AssignedOffset = curOffset; - sourceIdWriter.RegisterSourceId(body.SourceId, body.SeqNo, curOffset, CurrentTimestamp, std::move(keyRange)); - } else if (pp.IsDeregisterMessageGroup()) { - sourceIdWriter.DeregisterSourceId(pp.GetDeregisterMessageGroup().Body.SourceId); - } else if (pp.IsSplitMessageGroup()) { - for (auto& body : pp.GetSplitMessageGroup().Deregistrations) { - sourceIdWriter.DeregisterSourceId(body.SourceId); - } +bool TPartition::ProcessRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters) { + for (auto& body : msg.Deregistrations) { + parameters.SourceIdWriter.DeregisterSourceId(body.SourceId); + } - for (auto& body : pp.GetSplitMessageGroup().Registrations) { - TMaybe<TPartitionKeyRange> keyRange; - if (body.KeyRange) { - keyRange = TPartitionKeyRange::Parse(*body.KeyRange); - } + for (auto& body : msg.Registrations) { + TMaybe<TPartitionKeyRange> keyRange; + if (body.KeyRange) { + keyRange = TPartitionKeyRange::Parse(*body.KeyRange); + } - body.AssignedOffset = curOffset; - sourceIdWriter.RegisterSourceId(body.SourceId, body.SeqNo, curOffset, CurrentTimestamp, std::move(keyRange), true); - } - } else { - Y_ABORT_UNLESS(pp.IsOwnership()); - } + body.AssignedOffset = parameters.CurOffset; + parameters.SourceIdWriter.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true); + } - EmplaceResponse(std::move(pp), ctx); - continue; - } + return true; +} - Y_ABORT_UNLESS(pp.IsWrite()); - auto& p = pp.GetWrite(); +bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request, const TActorContext& ctx) { + ui64& curOffset = parameters.CurOffset; + TSourceIdWriter& sourceIdWriter = parameters.SourceIdWriter; + THeartbeatEmitter& heartbeatEmitter = parameters.HeartbeatEmitter; WriteInflightSize -= p.Msg.Data.size(); @@ -902,8 +875,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const } TString().swap(p.Msg.Data); - EmplaceResponse(std::move(pp), ctx); - continue; + return true; } if (const auto& hbVersion = p.Msg.HeartbeatVersion) { @@ -935,8 +907,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const p.Msg.SeqNo, curOffset, CurrentTimestamp, std::move(heartbeat) )); - EmplaceResponse(std::move(pp), ctx); - continue; + return true; } if (poffset < curOffset) { //too small offset @@ -964,8 +935,8 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const if (p.Msg.PartNo == 0) { //create new PartitionedBlob //there could be parts from previous owner, clear them - if (!oldPartsCleared) { - oldPartsCleared = true; + if (!parameters.OldPartsCleared) { + parameters.OldPartsCleared = true; auto del = request->Record.AddCmdDeleteRange(); auto range = del->MutableRange(); TKeyPrefix from(TKeyPrefix::TypeTmpData, Partition); @@ -987,7 +958,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const } PartitionedBlob = TPartitionedBlob(Partition, curOffset, p.Msg.SourceId, p.Msg.SeqNo, p.Msg.TotalParts, p.Msg.TotalSize, Head, NewHead, - headCleared, needCompactHead, MaxBlobSize); + parameters.HeadCleared, needCompactHead, MaxBlobSize); } LOG_DEBUG_S( @@ -1099,7 +1070,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const CompactedKeys.back().first.SetType(TKeyPrefix::TypeData); } if (PartitionedBlob.HasFormedBlobs()) { //Head and newHead are cleared - headCleared = true; + parameters.HeadCleared = true; NewHead.Clear(); NewHead.Offset = PartitionedBlob.GetOffset(); NewHead.PartNo = PartitionedBlob.GetHeadPartNo(); @@ -1145,7 +1116,62 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const ++curOffset; PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize); } + TString().swap(p.Msg.Data); + + return true; +} + +bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, + TSourceIdWriter& sourceIdWriter, THeartbeatEmitter& heartbeatEmitter) +{ + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AppendHeadWithNewWrites. Partition: " << Partition); + + ProcessParameters parameters(sourceIdWriter, heartbeatEmitter); + parameters.CurOffset = PartitionedBlob.IsInited() ? PartitionedBlob.GetOffset() : EndOffset; + + WriteCycleSize = 0; + WriteNewSize = 0; + WriteNewSizeUncompressed = 0; + WriteNewMessages = 0; + UpdateWriteBufferIsFullState(ctx.Now()); + CurrentTimestamp = ctx.Now(); + + NewHead.Offset = EndOffset; + NewHead.PartNo = 0; + NewHead.PackedSize = 0; + + Y_ABORT_UNLESS(NewHead.Batches.empty()); + + parameters.OldPartsCleared = false; + parameters.HeadCleared = (Head.PackedSize == 0); + + + //TODO: Process here not TClientBlobs, but also TBatches from LB(LB got them from pushclient too) + //Process is following: if batch contains already written messages or only one client message part -> unpack it and process as several TClientBlobs + //otherwise write this batch as is to head; + + while (!Requests.empty() && WriteCycleSize < MAX_WRITE_CYCLE_SIZE) { //head is not too big + auto pp = Requests.front(); + Requests.pop_front(); + + bool processed = true; + if (pp.IsWrite()) { + processed = ProcessRequest(pp.GetWrite(), parameters, request, ctx); + } else if (pp.IsRegisterMessageGroup()) { + processed = ProcessRequest(pp.GetRegisterMessageGroup(), parameters); + } else if (pp.IsDeregisterMessageGroup()) { + processed = ProcessRequest(pp.GetDeregisterMessageGroup(), parameters); + } else if (pp.IsSplitMessageGroup()) { + processed = ProcessRequest(pp.GetSplitMessageGroup(), parameters); + } else { + Y_ABORT_UNLESS(pp.IsOwnership()); + } + + if (!processed) { + return false; + } + EmplaceResponse(std::move(pp), ctx); } @@ -1159,10 +1185,10 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const NewHead.PackedSize -= NewHead.Batches.back().GetUnpackedSize(); } - Y_ABORT_UNLESS((headCleared ? 0 : Head.PackedSize) + NewHead.PackedSize <= MaxBlobSize); //otherwise last PartitionedBlob.Add must compact all except last cl + Y_ABORT_UNLESS((parameters.HeadCleared ? 0 : Head.PackedSize) + NewHead.PackedSize <= MaxBlobSize); //otherwise last PartitionedBlob.Add must compact all except last cl MaxWriteResponsesSize = Max<ui32>(MaxWriteResponsesSize, Responses.size()); - return headCleared; + return parameters.HeadCleared; } |