summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlek5andr-Kotov <[email protected]>2025-10-13 18:20:49 +0300
committerGitHub <[email protected]>2025-10-13 15:20:49 +0000
commitf9391ba7bbf81b19ddcaf88e9c6970b15323341a (patch)
tree9559a26bd4f82878e3e913396c7d2b96e93db5a6
parent21412cfb592755ec331215b2c3a7a4fcdfd875f4 (diff)
Incorrect WasTheLastBlobBig value at the start of the partition (#26560)
-rw-r--r--ydb/core/persqueue/events/internal.h10
-rw-r--r--ydb/core/persqueue/pqtablet/partition/partition.cpp44
-rw-r--r--ydb/core/persqueue/pqtablet/partition/partition.h5
-rw-r--r--ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp12
-rw-r--r--ydb/core/persqueue/pqtablet/partition/partition_init.cpp2
-rw-r--r--ydb/core/persqueue/pqtablet/pq_impl.cpp18
-rw-r--r--ydb/core/persqueue/pqtablet/pq_impl.h1
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.cpp15
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.h7
-rw-r--r--ydb/core/persqueue/ut/pq_ut.cpp46
10 files changed, 112 insertions, 48 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index 4d709af406e..e5cf8e5c0d7 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -198,6 +198,7 @@ struct TEvPQ {
EvRunCompaction,
EvMirrorTopicDescription,
EvBroadcastPartitionError,
+ EvForceCompaction,
EvEnd
};
@@ -1278,6 +1279,15 @@ struct TEvPQ {
ui64 BlobsCount = 0;
};
+
+ struct TEvForceCompaction : TEventLocal<TEvForceCompaction, EvForceCompaction> {
+ explicit TEvForceCompaction(const ui32 partitionId) :
+ PartitionId(partitionId)
+ {
+ }
+
+ ui32 PartitionId = 0;
+ };
};
} //NKikimr
diff --git a/ydb/core/persqueue/pqtablet/partition/partition.cpp b/ydb/core/persqueue/pqtablet/partition/partition.cpp
index 08dd69def4c..f68c9d9b398 100644
--- a/ydb/core/persqueue/pqtablet/partition/partition.cpp
+++ b/ydb/core/persqueue/pqtablet/partition/partition.cpp
@@ -2970,50 +2970,6 @@ void TPartition::CommitWriteOperations(TTransaction& t)
BlobEncoder.NewHead.Offset = Parameters->CurOffset;
}
- //if (!t.WriteInfo->BlobsFromHead.empty()) {
- // auto& first = t.WriteInfo->BlobsFromHead.front();
- // // In one operation, a partition can write blocks of several transactions. Some of them can be broken down
- // // into parts. We need to take this division into account.
- // BlobEncoder.NewHead.PartNo += first.GetPartNo();
-
- // Parameters->HeadCleared = Parameters->HeadCleared || !t.WriteInfo->BodyKeys.empty();
-
- // BlobEncoder.NewPartitionedBlob(Partition,
- // BlobEncoder.NewHead.Offset,
- // first.SourceId,
- // first.SeqNo,
- // first.GetTotalParts(),
- // first.GetTotalSize(),
- // Parameters->HeadCleared, // headCleared
- // false, // needCompactHead
- // MaxBlobSize,
- // first.GetPartNo());
-
- // for (auto& blob : t.WriteInfo->BlobsFromHead) {
- // TWriteMsg msg{Max<ui64>(), Nothing(), TEvPQ::TEvWrite::TMsg{
- // .SourceId = blob.SourceId,
- // .SeqNo = blob.SeqNo,
- // .PartNo = (ui16)(blob.PartData ? blob.PartData->PartNo : 0),
- // .TotalParts = (ui16)(blob.PartData ? blob.PartData->TotalParts : 1),
- // .TotalSize = (ui32)(blob.PartData ? blob.PartData->TotalSize : blob.UncompressedSize),
- // .CreateTimestamp = blob.CreateTimestamp.MilliSeconds(),
- // .ReceiveTimestamp = blob.CreateTimestamp.MilliSeconds(),
- // .DisableDeduplication = false,
- // .WriteTimestamp = blob.WriteTimestamp.MilliSeconds(),
- // .Data = blob.Data,
- // .UncompressedSize = blob.UncompressedSize,
- // .PartitionKey = blob.PartitionKey,
- // .ExplicitHashKey = blob.ExplicitHashKey,
- // .External = false,
- // .IgnoreQuotaDeadline = true,
- // .HeartbeatVersion = std::nullopt,
- // }, std::nullopt};
- // msg.Internal = true;
-
- // WriteInflightSize += msg.Msg.Data.size();
- // ExecRequest(msg, *Parameters, PersistRequest.Get());
- // }
- //}
for (const auto& [srcId, info] : t.WriteInfo->SrcIdInfo) {
auto& sourceIdBatch = Parameters->SourceIdBatch;
auto sourceId = sourceIdBatch.GetSource(srcId);
diff --git a/ydb/core/persqueue/pqtablet/partition/partition.h b/ydb/core/persqueue/pqtablet/partition/partition.h
index dac19163689..1caaf000e22 100644
--- a/ydb/core/persqueue/pqtablet/partition/partition.h
+++ b/ydb/core/persqueue/pqtablet/partition/partition.h
@@ -246,6 +246,7 @@ private:
void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvRunCompaction::TPtr& ev);
+ void Handle(TEvPQ::TEvForceCompaction::TPtr& ev);
void Handle(TEvPQ::TEvExclusiveLockAcquired::TPtr& ev);
void Handle(TEvPQ::TBroadcastPartitionError::TPtr& ev, const TActorContext& ctx);
void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx);
@@ -596,6 +597,7 @@ private:
HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit);
IgnoreFunc(TEvPQ::TEvTxBatchComplete);
hFuncTraced(TEvPQ::TEvRunCompaction, Handle);
+ hFuncTraced(TEvPQ::TEvForceCompaction, Handle);
default:
if (!Initializer.Handle(ev)) {
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev));
@@ -664,6 +666,7 @@ private:
HFuncTraced(TEvPQ::TEvDeletePartition, Handle);
IgnoreFunc(TEvPQ::TEvTxBatchComplete);
hFuncTraced(TEvPQ::TEvRunCompaction, Handle);
+ hFuncTraced(TEvPQ::TEvForceCompaction, Handle);
default:
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev));
break;
@@ -1116,7 +1119,7 @@ private:
const TEvPQ::TEvBlobResponse* blobResponse,
const TActorContext& ctx);
- void TryRunCompaction();
+ void TryRunCompaction(bool force = false);
void BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>& blobs);
void BlobsForCompactionWereWrite();
ui64 NextReadCookie();
diff --git a/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp b/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp
index f3bb7935856..34f50abc337 100644
--- a/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp
+++ b/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp
@@ -164,7 +164,7 @@ void TPartition::DumpKeysForBlobsCompaction() const
LOG_D("===================================");
}
-void TPartition::TryRunCompaction()
+void TPartition::TryRunCompaction(bool force)
{
if (StopCompaction) {
LOG_D("Blobs compaction is stopped");
@@ -186,7 +186,7 @@ void TPartition::TryRunCompaction()
const ui64 blobsKeyCountLimit = GetBodyKeysCountLimit();
const ui64 compactedBlobSizeLowerBound = GetCompactedBlobSizeLowerBound();
- if ((BlobEncoder.DataKeysBody.size() < blobsKeyCountLimit) && (BlobEncoder.GetSize() < GetCumulativeSizeLimit())) {
+ if ((BlobEncoder.DataKeysBody.size() < blobsKeyCountLimit) && (BlobEncoder.GetSize() < GetCumulativeSizeLimit()) && !force) {
LOG_D("No data for blobs compaction");
return;
}
@@ -207,6 +207,7 @@ void TPartition::TryRunCompaction()
LOG_D("Blob key for rename " << k.Key.ToString());
}
}
+
LOG_D(blobsCount << " keys were taken away. Let's read " << blobsSize << " bytes");
CompactionInProgress = true;
@@ -214,6 +215,11 @@ void TPartition::TryRunCompaction()
Send(SelfId(), new TEvPQ::TEvRunCompaction(blobsCount));
}
+void TPartition::Handle(TEvPQ::TEvForceCompaction::TPtr&)
+{
+ TryRunCompaction(true);
+}
+
void TPartition::Handle(TEvPQ::TEvRunCompaction::TPtr& ev)
{
const ui64 blobsCount = ev->Get()->BlobsCount;
@@ -337,7 +343,7 @@ void TPartition::RenameCompactedBlob(TDataKey& k,
if (!CompactionBlobEncoder.PartitionedBlob.IsInited()) {
CompactionBlobEncoder.NewPartitionedBlob(Partition,
- CompactionBlobEncoder.NewHead.Offset,
+ parameters.CurOffset,
"", // SourceId
0, // SeqNo
0, // TotalParts
diff --git a/ydb/core/persqueue/pqtablet/partition/partition_init.cpp b/ydb/core/persqueue/pqtablet/partition/partition_init.cpp
index 7d1a395a90b..214dccaf96e 100644
--- a/ydb/core/persqueue/pqtablet/partition/partition_init.cpp
+++ b/ydb/core/persqueue/pqtablet/partition/partition_init.cpp
@@ -778,6 +778,8 @@ void TInitDataRangeStep::FormHeadAndProceed() {
cz.Head.Offset = headKey.GetOffset();
cz.Head.PartNo = headKey.GetPartNo();
+
+ Partition()->WasTheLastBlobBig = false;
}
// FastWrite Body
diff --git a/ydb/core/persqueue/pqtablet/pq_impl.cpp b/ydb/core/persqueue/pqtablet/pq_impl.cpp
index b837dc82fc7..f54719f7454 100644
--- a/ydb/core/persqueue/pqtablet/pq_impl.cpp
+++ b/ydb/core/persqueue/pqtablet/pq_impl.cpp
@@ -5238,6 +5238,23 @@ void TPersQueue::ProcessPendingEvents()
}
}
+void TPersQueue::Handle(TEvPQ::TEvForceCompaction::TPtr& ev, const TActorContext& ctx)
+{
+ PQ_LOG_D("TPersQueue::Handle(TEvPQ::TEvForceCompaction)");
+
+ const auto& event = *ev->Get();
+ const TPartitionId partitionId(event.PartitionId);
+
+ if (!Partitions.contains(partitionId)) {
+ PQ_LOG_D("Unknown partition id " << event.PartitionId);
+ return;
+ }
+
+ auto p = Partitions.find(partitionId);
+ ctx.Send(p->second.Actor,
+ new TEvPQ::TEvForceCompaction(event.PartitionId));
+}
+
bool TPersQueue::HandleHook(STFUNC_SIG)
{
TRACE_EVENT(NKikimrServices::PERSQUEUE);
@@ -5285,6 +5302,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
HFuncTraced(TEvPQ::TEvReadingPartitionStatusRequest, Handle);
HFuncTraced(TEvPQ::TEvDeletePartitionDone, Handle);
HFuncTraced(TEvPQ::TEvTransactionCompleted, Handle);
+ HFuncTraced(TEvPQ::TEvForceCompaction, Handle);
default:
return false;
}
diff --git a/ydb/core/persqueue/pqtablet/pq_impl.h b/ydb/core/persqueue/pqtablet/pq_impl.h
index 210734d59ac..bafc37ac98f 100644
--- a/ydb/core/persqueue/pqtablet/pq_impl.h
+++ b/ydb/core/persqueue/pqtablet/pq_impl.h
@@ -603,6 +603,7 @@ private:
void ResendSplitMergeRequests(const TActorContext& ctx);
+ void Handle(TEvPQ::TEvForceCompaction::TPtr& ev, const TActorContext& ctx);
TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> SamplingControl;
NWilson::TSpan WriteTxsSpan;
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
index 8f51bbe685b..39b17e3e3af 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
@@ -1250,4 +1250,19 @@ THolder<TEvPersQueue::TEvPeriodicTopicStats> GetReadBalancerPeriodicTopicStats(T
return runtime.GrabEdgeEvent<TEvPersQueue::TEvPeriodicTopicStats>(TDuration::Seconds(2));
}
+void CmdRunCompaction(TTestActorRuntime& runtime,
+ ui64 tabletId,
+ const TActorId& sender,
+ const ui32 partition)
+{
+ auto event = MakeHolder<TEvPQ::TEvForceCompaction>(partition);
+ runtime.SendToPipe(tabletId, sender, event.Release(), 0, GetPipeConfigWithRetries());
+}
+
+void CmdRunCompaction(const ui32 partition,
+ TTestContext& tc)
+{
+ CmdRunCompaction(*tc.Runtime, tc.TabletId, tc.Edge, partition);
+}
+
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h
index 21034932e1c..7682c4d1262 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.h
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.h
@@ -644,6 +644,13 @@ struct TCmdWriteOptions {
};
void CmdWrite(const TCmdWriteOptions&);
+void CmdRunCompaction(TTestActorRuntime& runtime,
+ ui64 tabletId,
+ const TActorId& sender,
+ const ui32 partition);
+void CmdRunCompaction(const ui32 partition,
+ TTestContext& tc);
+
THolder<TEvPersQueue::TEvPeriodicTopicStats> GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId);
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp
index 78b617361b9..80b8230fa65 100644
--- a/ydb/core/persqueue/ut/pq_ut.cpp
+++ b/ydb/core/persqueue/ut/pq_ut.cpp
@@ -52,6 +52,52 @@ TMaybe<ui64> PQGetStartOffset(TTestContext& tc)
return Nothing();
}
+Y_UNIT_TEST(TestCompaction) {
+ TTestContext tc;
+ tc.EnableDetailedPQLog = true;
+ RunTestWithReboots(tc.TabletIds, [&]() {
+ return tc.InitialEventsFilter.Prepare();
+ }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
+ activeZone = false;
+ TFinalizer finalizer(tc);
+ tc.Prepare(dispatchName, setup, activeZone);
+ activeZone = false;
+ tc.Runtime->SetScheduledLimit(1000);
+
+ ui32 sourceIdx = 0;
+ auto cmdWrite = [&](const TVector<size_t>& sizes) {
+ TVector<std::pair<ui64, TString>> data;
+ for (size_t k = 1; k <= sizes.size(); ++k) {
+ data.emplace_back(k, TString(sizes[k - 1], 'x'));
+ }
+ TString sourceId = "sourceid_" + ToString(sourceIdx++);
+ CmdWrite(0, sourceId, data, tc, false, {}, false, "", -1, -1, false, false, true);
+ };
+ auto cmdCompaction = [&]() {
+ CmdRunCompaction(0, tc);
+ };
+
+ PQTabletPrepare({.partitions = 1, .writeSpeed = 50_MB}, {{"user1", true}}, tc);
+
+ cmdWrite({17400_KB});
+ cmdCompaction();
+
+ cmdWrite({16800_KB});
+ cmdCompaction();
+
+ PQTabletRestart(tc);
+
+ cmdWrite({7000_KB, 13300_KB});
+ cmdCompaction();
+
+ cmdWrite({1_KB});
+
+ PQTabletRestart(tc);
+
+ PQGetPartInfo(0, 4 + 1, tc);
+ });
+}
+
Y_UNIT_TEST(TestCmdReadWithLastOffset) {
TTestContext tc;
tc.EnableDetailedPQLog = true;