aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-11-01 09:00:56 +0300
committertesseract <tesseract@yandex-team.com>2023-11-01 09:21:41 +0300
commitb20aa0ca2385dad33e357649643a944c2f678557 (patch)
tree3bddfb417ed6830692251c40abe89aae42d86441
parent7c7f3222f32903c37ee4c5790635e9e03f687417 (diff)
downloadydb-b20aa0ca2385dad33e357649643a944c2f678557.tar.gz
Split the method to parts which processing one message type
-rw-r--r--ydb/core/persqueue/partition.h21
-rw-r--r--ydb/core/persqueue/partition_write.cpp164
2 files changed, 116 insertions, 69 deletions
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index e59a26c9a7..9709d6b973 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -525,6 +525,27 @@ private:
}
private:
+ struct ProcessParameters {
+ ProcessParameters(TSourceIdWriter& sourceIdWriter,
+ THeartbeatEmitter& heartbeatEmitter)
+ : SourceIdWriter(sourceIdWriter)
+ , HeartbeatEmitter(heartbeatEmitter) {
+ }
+
+ TSourceIdWriter& SourceIdWriter;
+ THeartbeatEmitter& HeartbeatEmitter;
+
+ ui64 CurOffset;
+ bool OldPartsCleared;
+ bool HeadCleared;
+ };
+
+ bool ProcessRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters);
+ bool ProcessRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters);
+ bool ProcessRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters);
+ bool ProcessRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
+
+private:
ui64 TabletID;
ui32 Partition;
NKikimrPQ::TPQTabletConfig Config;
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index 7bac53df95..1b44e31cb2 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -800,75 +800,48 @@ void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::T
WriteCycleSize = 0;
}
-bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx,
- TSourceIdWriter& sourceIdWriter, THeartbeatEmitter& heartbeatEmitter)
-{
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AppendHeadWithNewWrites. Partition: " << Partition);
-
- ui64 curOffset = PartitionedBlob.IsInited() ? PartitionedBlob.GetOffset() : EndOffset;
-
- WriteCycleSize = 0;
- WriteNewSize = 0;
- WriteNewSizeUncompressed = 0;
- WriteNewMessages = 0;
- UpdateWriteBufferIsFullState(ctx.Now());
- CurrentTimestamp = ctx.Now();
-
- NewHead.Offset = EndOffset;
- NewHead.PartNo = 0;
- NewHead.PackedSize = 0;
-
- Y_ABORT_UNLESS(NewHead.Batches.empty());
-
- bool oldPartsCleared = false;
- bool headCleared = (Head.PackedSize == 0);
+bool TPartition::ProcessRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters) {
+ auto& body = msg.Body;
+ TMaybe<TPartitionKeyRange> keyRange;
+ if (body.KeyRange) {
+ keyRange = TPartitionKeyRange::Parse(*body.KeyRange);
+ }
- //TODO: Process here not TClientBlobs, but also TBatches from LB(LB got them from pushclient too)
- //Process is following: if batch contains already written messages or only one client message part -> unpack it and process as several TClientBlobs
- //otherwise write this batch as is to head;
+ body.AssignedOffset = parameters.CurOffset;
+ parameters.SourceIdWriter.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange));
- while (!Requests.empty() && WriteCycleSize < MAX_WRITE_CYCLE_SIZE) { //head is not too big
- auto pp = Requests.front();
- Requests.pop_front();
+ return true;
+}
- if (!pp.IsWrite()) {
- if (pp.IsRegisterMessageGroup()) {
- auto& body = pp.GetRegisterMessageGroup().Body;
+bool TPartition::ProcessRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters) {
+ parameters.SourceIdWriter.DeregisterSourceId(msg.Body.SourceId);
- TMaybe<TPartitionKeyRange> keyRange;
- if (body.KeyRange) {
- keyRange = TPartitionKeyRange::Parse(*body.KeyRange);
- }
+ return true;
+}
- body.AssignedOffset = curOffset;
- sourceIdWriter.RegisterSourceId(body.SourceId, body.SeqNo, curOffset, CurrentTimestamp, std::move(keyRange));
- } else if (pp.IsDeregisterMessageGroup()) {
- sourceIdWriter.DeregisterSourceId(pp.GetDeregisterMessageGroup().Body.SourceId);
- } else if (pp.IsSplitMessageGroup()) {
- for (auto& body : pp.GetSplitMessageGroup().Deregistrations) {
- sourceIdWriter.DeregisterSourceId(body.SourceId);
- }
+bool TPartition::ProcessRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters) {
+ for (auto& body : msg.Deregistrations) {
+ parameters.SourceIdWriter.DeregisterSourceId(body.SourceId);
+ }
- for (auto& body : pp.GetSplitMessageGroup().Registrations) {
- TMaybe<TPartitionKeyRange> keyRange;
- if (body.KeyRange) {
- keyRange = TPartitionKeyRange::Parse(*body.KeyRange);
- }
+ for (auto& body : msg.Registrations) {
+ TMaybe<TPartitionKeyRange> keyRange;
+ if (body.KeyRange) {
+ keyRange = TPartitionKeyRange::Parse(*body.KeyRange);
+ }
- body.AssignedOffset = curOffset;
- sourceIdWriter.RegisterSourceId(body.SourceId, body.SeqNo, curOffset, CurrentTimestamp, std::move(keyRange), true);
- }
- } else {
- Y_ABORT_UNLESS(pp.IsOwnership());
- }
+ body.AssignedOffset = parameters.CurOffset;
+ parameters.SourceIdWriter.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true);
+ }
- EmplaceResponse(std::move(pp), ctx);
- continue;
- }
+ return true;
+}
- Y_ABORT_UNLESS(pp.IsWrite());
- auto& p = pp.GetWrite();
+bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request, const TActorContext& ctx) {
+ ui64& curOffset = parameters.CurOffset;
+ TSourceIdWriter& sourceIdWriter = parameters.SourceIdWriter;
+ THeartbeatEmitter& heartbeatEmitter = parameters.HeartbeatEmitter;
WriteInflightSize -= p.Msg.Data.size();
@@ -902,8 +875,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
}
TString().swap(p.Msg.Data);
- EmplaceResponse(std::move(pp), ctx);
- continue;
+ return true;
}
if (const auto& hbVersion = p.Msg.HeartbeatVersion) {
@@ -935,8 +907,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
p.Msg.SeqNo, curOffset, CurrentTimestamp, std::move(heartbeat)
));
- EmplaceResponse(std::move(pp), ctx);
- continue;
+ return true;
}
if (poffset < curOffset) { //too small offset
@@ -964,8 +935,8 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
if (p.Msg.PartNo == 0) { //create new PartitionedBlob
//there could be parts from previous owner, clear them
- if (!oldPartsCleared) {
- oldPartsCleared = true;
+ if (!parameters.OldPartsCleared) {
+ parameters.OldPartsCleared = true;
auto del = request->Record.AddCmdDeleteRange();
auto range = del->MutableRange();
TKeyPrefix from(TKeyPrefix::TypeTmpData, Partition);
@@ -987,7 +958,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
}
PartitionedBlob = TPartitionedBlob(Partition, curOffset, p.Msg.SourceId, p.Msg.SeqNo,
p.Msg.TotalParts, p.Msg.TotalSize, Head, NewHead,
- headCleared, needCompactHead, MaxBlobSize);
+ parameters.HeadCleared, needCompactHead, MaxBlobSize);
}
LOG_DEBUG_S(
@@ -1099,7 +1070,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
CompactedKeys.back().first.SetType(TKeyPrefix::TypeData);
}
if (PartitionedBlob.HasFormedBlobs()) { //Head and newHead are cleared
- headCleared = true;
+ parameters.HeadCleared = true;
NewHead.Clear();
NewHead.Offset = PartitionedBlob.GetOffset();
NewHead.PartNo = PartitionedBlob.GetHeadPartNo();
@@ -1145,7 +1116,62 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
++curOffset;
PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize);
}
+
TString().swap(p.Msg.Data);
+
+ return true;
+}
+
+bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx,
+ TSourceIdWriter& sourceIdWriter, THeartbeatEmitter& heartbeatEmitter)
+{
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AppendHeadWithNewWrites. Partition: " << Partition);
+
+ ProcessParameters parameters(sourceIdWriter, heartbeatEmitter);
+ parameters.CurOffset = PartitionedBlob.IsInited() ? PartitionedBlob.GetOffset() : EndOffset;
+
+ WriteCycleSize = 0;
+ WriteNewSize = 0;
+ WriteNewSizeUncompressed = 0;
+ WriteNewMessages = 0;
+ UpdateWriteBufferIsFullState(ctx.Now());
+ CurrentTimestamp = ctx.Now();
+
+ NewHead.Offset = EndOffset;
+ NewHead.PartNo = 0;
+ NewHead.PackedSize = 0;
+
+ Y_ABORT_UNLESS(NewHead.Batches.empty());
+
+ parameters.OldPartsCleared = false;
+ parameters.HeadCleared = (Head.PackedSize == 0);
+
+
+ //TODO: Process here not TClientBlobs, but also TBatches from LB(LB got them from pushclient too)
+ //Process is following: if batch contains already written messages or only one client message part -> unpack it and process as several TClientBlobs
+ //otherwise write this batch as is to head;
+
+ while (!Requests.empty() && WriteCycleSize < MAX_WRITE_CYCLE_SIZE) { //head is not too big
+ auto pp = Requests.front();
+ Requests.pop_front();
+
+ bool processed = true;
+ if (pp.IsWrite()) {
+ processed = ProcessRequest(pp.GetWrite(), parameters, request, ctx);
+ } else if (pp.IsRegisterMessageGroup()) {
+ processed = ProcessRequest(pp.GetRegisterMessageGroup(), parameters);
+ } else if (pp.IsDeregisterMessageGroup()) {
+ processed = ProcessRequest(pp.GetDeregisterMessageGroup(), parameters);
+ } else if (pp.IsSplitMessageGroup()) {
+ processed = ProcessRequest(pp.GetSplitMessageGroup(), parameters);
+ } else {
+ Y_ABORT_UNLESS(pp.IsOwnership());
+ }
+
+ if (!processed) {
+ return false;
+ }
+
EmplaceResponse(std::move(pp), ctx);
}
@@ -1159,10 +1185,10 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
NewHead.PackedSize -= NewHead.Batches.back().GetUnpackedSize();
}
- Y_ABORT_UNLESS((headCleared ? 0 : Head.PackedSize) + NewHead.PackedSize <= MaxBlobSize); //otherwise last PartitionedBlob.Add must compact all except last cl
+ Y_ABORT_UNLESS((parameters.HeadCleared ? 0 : Head.PackedSize) + NewHead.PackedSize <= MaxBlobSize); //otherwise last PartitionedBlob.Add must compact all except last cl
MaxWriteResponsesSize = Max<ui32>(MaxWriteResponsesSize, Responses.size());
- return headCleared;
+ return parameters.HeadCleared;
}