diff options
author | Alek5andr-Kotov <[email protected]> | 2025-10-13 18:20:49 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2025-10-13 15:20:49 +0000 |
commit | f9391ba7bbf81b19ddcaf88e9c6970b15323341a (patch) | |
tree | 9559a26bd4f82878e3e913396c7d2b96e93db5a6 | |
parent | 21412cfb592755ec331215b2c3a7a4fcdfd875f4 (diff) |
Incorrect WasTheLastBlobBig value at the start of the partition (#26560)
-rw-r--r-- | ydb/core/persqueue/events/internal.h | 10 | ||||
-rw-r--r-- | ydb/core/persqueue/pqtablet/partition/partition.cpp | 44 | ||||
-rw-r--r-- | ydb/core/persqueue/pqtablet/partition/partition.h | 5 | ||||
-rw-r--r-- | ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp | 12 | ||||
-rw-r--r-- | ydb/core/persqueue/pqtablet/partition/partition_init.cpp | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/pqtablet/pq_impl.cpp | 18 | ||||
-rw-r--r-- | ydb/core/persqueue/pqtablet/pq_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/common/pq_ut_common.cpp | 15 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/common/pq_ut_common.h | 7 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/pq_ut.cpp | 46 |
10 files changed, 112 insertions, 48 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 4d709af406e..e5cf8e5c0d7 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -198,6 +198,7 @@ struct TEvPQ { EvRunCompaction, EvMirrorTopicDescription, EvBroadcastPartitionError, + EvForceCompaction, EvEnd }; @@ -1278,6 +1279,15 @@ struct TEvPQ { ui64 BlobsCount = 0; }; + + struct TEvForceCompaction : TEventLocal<TEvForceCompaction, EvForceCompaction> { + explicit TEvForceCompaction(const ui32 partitionId) : + PartitionId(partitionId) + { + } + + ui32 PartitionId = 0; + }; }; } //NKikimr diff --git a/ydb/core/persqueue/pqtablet/partition/partition.cpp b/ydb/core/persqueue/pqtablet/partition/partition.cpp index 08dd69def4c..f68c9d9b398 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition.cpp @@ -2970,50 +2970,6 @@ void TPartition::CommitWriteOperations(TTransaction& t) BlobEncoder.NewHead.Offset = Parameters->CurOffset; } - //if (!t.WriteInfo->BlobsFromHead.empty()) { - // auto& first = t.WriteInfo->BlobsFromHead.front(); - // // In one operation, a partition can write blocks of several transactions. Some of them can be broken down - // // into parts. We need to take this division into account. - // BlobEncoder.NewHead.PartNo += first.GetPartNo(); - - // Parameters->HeadCleared = Parameters->HeadCleared || !t.WriteInfo->BodyKeys.empty(); - - // BlobEncoder.NewPartitionedBlob(Partition, - // BlobEncoder.NewHead.Offset, - // first.SourceId, - // first.SeqNo, - // first.GetTotalParts(), - // first.GetTotalSize(), - // Parameters->HeadCleared, // headCleared - // false, // needCompactHead - // MaxBlobSize, - // first.GetPartNo()); - - // for (auto& blob : t.WriteInfo->BlobsFromHead) { - // TWriteMsg msg{Max<ui64>(), Nothing(), TEvPQ::TEvWrite::TMsg{ - // .SourceId = blob.SourceId, - // .SeqNo = blob.SeqNo, - // .PartNo = (ui16)(blob.PartData ? blob.PartData->PartNo : 0), - // .TotalParts = (ui16)(blob.PartData ? blob.PartData->TotalParts : 1), - // .TotalSize = (ui32)(blob.PartData ? blob.PartData->TotalSize : blob.UncompressedSize), - // .CreateTimestamp = blob.CreateTimestamp.MilliSeconds(), - // .ReceiveTimestamp = blob.CreateTimestamp.MilliSeconds(), - // .DisableDeduplication = false, - // .WriteTimestamp = blob.WriteTimestamp.MilliSeconds(), - // .Data = blob.Data, - // .UncompressedSize = blob.UncompressedSize, - // .PartitionKey = blob.PartitionKey, - // .ExplicitHashKey = blob.ExplicitHashKey, - // .External = false, - // .IgnoreQuotaDeadline = true, - // .HeartbeatVersion = std::nullopt, - // }, std::nullopt}; - // msg.Internal = true; - - // WriteInflightSize += msg.Msg.Data.size(); - // ExecRequest(msg, *Parameters, PersistRequest.Get()); - // } - //} for (const auto& [srcId, info] : t.WriteInfo->SrcIdInfo) { auto& sourceIdBatch = Parameters->SourceIdBatch; auto sourceId = sourceIdBatch.GetSource(srcId); diff --git a/ydb/core/persqueue/pqtablet/partition/partition.h b/ydb/core/persqueue/pqtablet/partition/partition.h index dac19163689..1caaf000e22 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.h +++ b/ydb/core/persqueue/pqtablet/partition/partition.h @@ -246,6 +246,7 @@ private: void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvRunCompaction::TPtr& ev); + void Handle(TEvPQ::TEvForceCompaction::TPtr& ev); void Handle(TEvPQ::TEvExclusiveLockAcquired::TPtr& ev); void Handle(TEvPQ::TBroadcastPartitionError::TPtr& ev, const TActorContext& ctx); void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx); @@ -596,6 +597,7 @@ private: HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit); IgnoreFunc(TEvPQ::TEvTxBatchComplete); hFuncTraced(TEvPQ::TEvRunCompaction, Handle); + hFuncTraced(TEvPQ::TEvForceCompaction, Handle); default: if (!Initializer.Handle(ev)) { ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev)); @@ -664,6 +666,7 @@ private: HFuncTraced(TEvPQ::TEvDeletePartition, Handle); IgnoreFunc(TEvPQ::TEvTxBatchComplete); hFuncTraced(TEvPQ::TEvRunCompaction, Handle); + hFuncTraced(TEvPQ::TEvForceCompaction, Handle); default: ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev)); break; @@ -1116,7 +1119,7 @@ private: const TEvPQ::TEvBlobResponse* blobResponse, const TActorContext& ctx); - void TryRunCompaction(); + void TryRunCompaction(bool force = false); void BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>& blobs); void BlobsForCompactionWereWrite(); ui64 NextReadCookie(); diff --git a/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp b/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp index f3bb7935856..34f50abc337 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp @@ -164,7 +164,7 @@ void TPartition::DumpKeysForBlobsCompaction() const LOG_D("==================================="); } -void TPartition::TryRunCompaction() +void TPartition::TryRunCompaction(bool force) { if (StopCompaction) { LOG_D("Blobs compaction is stopped"); @@ -186,7 +186,7 @@ void TPartition::TryRunCompaction() const ui64 blobsKeyCountLimit = GetBodyKeysCountLimit(); const ui64 compactedBlobSizeLowerBound = GetCompactedBlobSizeLowerBound(); - if ((BlobEncoder.DataKeysBody.size() < blobsKeyCountLimit) && (BlobEncoder.GetSize() < GetCumulativeSizeLimit())) { + if ((BlobEncoder.DataKeysBody.size() < blobsKeyCountLimit) && (BlobEncoder.GetSize() < GetCumulativeSizeLimit()) && !force) { LOG_D("No data for blobs compaction"); return; } @@ -207,6 +207,7 @@ void TPartition::TryRunCompaction() LOG_D("Blob key for rename " << k.Key.ToString()); } } + LOG_D(blobsCount << " keys were taken away. Let's read " << blobsSize << " bytes"); CompactionInProgress = true; @@ -214,6 +215,11 @@ void TPartition::TryRunCompaction() Send(SelfId(), new TEvPQ::TEvRunCompaction(blobsCount)); } +void TPartition::Handle(TEvPQ::TEvForceCompaction::TPtr&) +{ + TryRunCompaction(true); +} + void TPartition::Handle(TEvPQ::TEvRunCompaction::TPtr& ev) { const ui64 blobsCount = ev->Get()->BlobsCount; @@ -337,7 +343,7 @@ void TPartition::RenameCompactedBlob(TDataKey& k, if (!CompactionBlobEncoder.PartitionedBlob.IsInited()) { CompactionBlobEncoder.NewPartitionedBlob(Partition, - CompactionBlobEncoder.NewHead.Offset, + parameters.CurOffset, "", // SourceId 0, // SeqNo 0, // TotalParts diff --git a/ydb/core/persqueue/pqtablet/partition/partition_init.cpp b/ydb/core/persqueue/pqtablet/partition/partition_init.cpp index 7d1a395a90b..214dccaf96e 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition_init.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition_init.cpp @@ -778,6 +778,8 @@ void TInitDataRangeStep::FormHeadAndProceed() { cz.Head.Offset = headKey.GetOffset(); cz.Head.PartNo = headKey.GetPartNo(); + + Partition()->WasTheLastBlobBig = false; } // FastWrite Body diff --git a/ydb/core/persqueue/pqtablet/pq_impl.cpp b/ydb/core/persqueue/pqtablet/pq_impl.cpp index b837dc82fc7..f54719f7454 100644 --- a/ydb/core/persqueue/pqtablet/pq_impl.cpp +++ b/ydb/core/persqueue/pqtablet/pq_impl.cpp @@ -5238,6 +5238,23 @@ void TPersQueue::ProcessPendingEvents() } } +void TPersQueue::Handle(TEvPQ::TEvForceCompaction::TPtr& ev, const TActorContext& ctx) +{ + PQ_LOG_D("TPersQueue::Handle(TEvPQ::TEvForceCompaction)"); + + const auto& event = *ev->Get(); + const TPartitionId partitionId(event.PartitionId); + + if (!Partitions.contains(partitionId)) { + PQ_LOG_D("Unknown partition id " << event.PartitionId); + return; + } + + auto p = Partitions.find(partitionId); + ctx.Send(p->second.Actor, + new TEvPQ::TEvForceCompaction(event.PartitionId)); +} + bool TPersQueue::HandleHook(STFUNC_SIG) { TRACE_EVENT(NKikimrServices::PERSQUEUE); @@ -5285,6 +5302,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG) HFuncTraced(TEvPQ::TEvReadingPartitionStatusRequest, Handle); HFuncTraced(TEvPQ::TEvDeletePartitionDone, Handle); HFuncTraced(TEvPQ::TEvTransactionCompleted, Handle); + HFuncTraced(TEvPQ::TEvForceCompaction, Handle); default: return false; } diff --git a/ydb/core/persqueue/pqtablet/pq_impl.h b/ydb/core/persqueue/pqtablet/pq_impl.h index 210734d59ac..bafc37ac98f 100644 --- a/ydb/core/persqueue/pqtablet/pq_impl.h +++ b/ydb/core/persqueue/pqtablet/pq_impl.h @@ -603,6 +603,7 @@ private: void ResendSplitMergeRequests(const TActorContext& ctx); + void Handle(TEvPQ::TEvForceCompaction::TPtr& ev, const TActorContext& ctx); TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> SamplingControl; NWilson::TSpan WriteTxsSpan; diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp index 8f51bbe685b..39b17e3e3af 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp @@ -1250,4 +1250,19 @@ THolder<TEvPersQueue::TEvPeriodicTopicStats> GetReadBalancerPeriodicTopicStats(T return runtime.GrabEdgeEvent<TEvPersQueue::TEvPeriodicTopicStats>(TDuration::Seconds(2)); } +void CmdRunCompaction(TTestActorRuntime& runtime, + ui64 tabletId, + const TActorId& sender, + const ui32 partition) +{ + auto event = MakeHolder<TEvPQ::TEvForceCompaction>(partition); + runtime.SendToPipe(tabletId, sender, event.Release(), 0, GetPipeConfigWithRetries()); +} + +void CmdRunCompaction(const ui32 partition, + TTestContext& tc) +{ + CmdRunCompaction(*tc.Runtime, tc.TabletId, tc.Edge, partition); +} + } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h index 21034932e1c..7682c4d1262 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.h +++ b/ydb/core/persqueue/ut/common/pq_ut_common.h @@ -644,6 +644,13 @@ struct TCmdWriteOptions { }; void CmdWrite(const TCmdWriteOptions&); +void CmdRunCompaction(TTestActorRuntime& runtime, + ui64 tabletId, + const TActorId& sender, + const ui32 partition); +void CmdRunCompaction(const ui32 partition, + TTestContext& tc); + THolder<TEvPersQueue::TEvPeriodicTopicStats> GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId); } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp index 78b617361b9..80b8230fa65 100644 --- a/ydb/core/persqueue/ut/pq_ut.cpp +++ b/ydb/core/persqueue/ut/pq_ut.cpp @@ -52,6 +52,52 @@ TMaybe<ui64> PQGetStartOffset(TTestContext& tc) return Nothing(); } +Y_UNIT_TEST(TestCompaction) { + TTestContext tc; + tc.EnableDetailedPQLog = true; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { + activeZone = false; + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + activeZone = false; + tc.Runtime->SetScheduledLimit(1000); + + ui32 sourceIdx = 0; + auto cmdWrite = [&](const TVector<size_t>& sizes) { + TVector<std::pair<ui64, TString>> data; + for (size_t k = 1; k <= sizes.size(); ++k) { + data.emplace_back(k, TString(sizes[k - 1], 'x')); + } + TString sourceId = "sourceid_" + ToString(sourceIdx++); + CmdWrite(0, sourceId, data, tc, false, {}, false, "", -1, -1, false, false, true); + }; + auto cmdCompaction = [&]() { + CmdRunCompaction(0, tc); + }; + + PQTabletPrepare({.partitions = 1, .writeSpeed = 50_MB}, {{"user1", true}}, tc); + + cmdWrite({17400_KB}); + cmdCompaction(); + + cmdWrite({16800_KB}); + cmdCompaction(); + + PQTabletRestart(tc); + + cmdWrite({7000_KB, 13300_KB}); + cmdCompaction(); + + cmdWrite({1_KB}); + + PQTabletRestart(tc); + + PQGetPartInfo(0, 4 + 1, tc); + }); +} + Y_UNIT_TEST(TestCmdReadWithLastOffset) { TTestContext tc; tc.EnableDetailedPQLog = true; |