diff options
| -rw-r--r-- | ydb/core/persqueue/events/internal.h | 1 | ||||
| -rw-r--r-- | ydb/core/persqueue/partition.h | 6 | ||||
| -rw-r--r-- | ydb/core/persqueue/partition_compaction.cpp | 8 | ||||
| -rw-r--r-- | ydb/core/persqueue/partition_write.cpp | 11 | ||||
| -rw-r--r-- | ydb/core/persqueue/read.h | 1 | ||||
| -rw-r--r-- | ydb/core/persqueue/ut/pq_ut.cpp | 86 |
6 files changed, 112 insertions, 1 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index a815ce54a1d..4f274aba918 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -57,6 +57,7 @@ namespace NPQ { TString Value; bool Cached; TKey Key; + ui64 CreationUnixTime = 0; TRequestedBlob() = delete; diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index e05a3c926b1..ec89b341309 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -1004,6 +1004,10 @@ private: void AddCmdWrite(const std::optional<TPartitionedBlob::TFormedBlobInfo>& newWrite, TEvKeyValue::TEvRequest* request, + ui64 creationUnixTime, + const TActorContext& ctx); + void AddCmdWrite(const std::optional<TPartitionedBlob::TFormedBlobInfo>& newWrite, + TEvKeyValue::TEvRequest* request, const TActorContext& ctx); void RenameFormedBlobs(const std::deque<TPartitionedBlob::TRenameFormedBlobInfo>& formedBlobs, TProcessParametersBase& parameters, @@ -1054,6 +1058,8 @@ private: TInstant GetFirstUncompactedBlobTimestamp() const; void TryCorrectStartOffset(TMaybe<ui64> offset); + + ui64 LastRequestedBlobCreationUnixTime = 0; }; } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition_compaction.cpp b/ydb/core/persqueue/partition_compaction.cpp index a49c597427d..6c8f117340c 100644 --- a/ydb/core/persqueue/partition_compaction.cpp +++ b/ydb/core/persqueue/partition_compaction.cpp @@ -85,7 +85,7 @@ bool TPartition::ExecRequestForCompaction(TWriteMsg& p, TProcessParametersBase& auto newWrite = CompactionBlobEncoder.PartitionedBlob.Add(std::move(blob)); if (newWrite && !newWrite->Value.empty()) { - AddCmdWrite(newWrite, request, ctx); + AddCmdWrite(newWrite, request, LastRequestedBlobCreationUnixTime, ctx); PQ_LOG_D("Topic '" << TopicName() << "' partition " << Partition << @@ -266,6 +266,8 @@ void TPartition::BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>& for (const auto& requestedBlob : blobs) { TMaybe<ui64> firstBlobOffset = requestedBlob.Offset; + LastRequestedBlobCreationUnixTime = requestedBlob.CreationUnixTime; + for (TBlobIterator it(requestedBlob.Key, requestedBlob.Value); it.IsValid(); it.Next()) { TBatch batch = it.GetBatch(); batch.Unpack(); @@ -306,6 +308,9 @@ void TPartition::BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>& EndProcessWritesForCompaction(compactionRequest.Get(), ctx); + // for debugging purposes + //DumpKeyValueRequest(compactionRequest->Record); + ctx.Send(BlobCache, compactionRequest.Release(), 0, 0); } @@ -468,6 +473,7 @@ void TPartition::AddNewCompactionWriteBlob(std::pair<TKey, ui32>& res, TEvKeyVal auto write = request->Record.AddCmdWrite(); write->SetKey(key.Data(), key.Size()); write->SetValue(valueD); + write->SetCreationUnixTime(LastRequestedBlobCreationUnixTime); bool isInline = key.IsHead() && valueD.size() < MAX_INLINE_SIZE; diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 3d43948b5fa..253fa1a9026 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -1005,11 +1005,15 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) { void TPartition::AddCmdWrite(const std::optional<TPartitionedBlob::TFormedBlobInfo>& newWrite, TEvKeyValue::TEvRequest* request, + ui64 creationUnixTime, const TActorContext& ctx) { auto write = request->Record.AddCmdWrite(); write->SetKey(newWrite->Key.Data(), newWrite->Key.Size()); write->SetValue(newWrite->Value); + if (creationUnixTime) { + write->SetCreationUnixTime(creationUnixTime); + } //Y_ABORT_UNLESS(newWrite->Key.IsFastWrite()); auto channel = GetChannel(NextChannel(newWrite->Key.HasSuffix(), newWrite->Value.size())); write->SetStorageChannel(channel); @@ -1020,6 +1024,13 @@ void TPartition::AddCmdWrite(const std::optional<TPartitionedBlob::TFormedBlobIn WriteCycleSize += newWrite->Value.size(); } +void TPartition::AddCmdWrite(const std::optional<TPartitionedBlob::TFormedBlobInfo>& newWrite, + TEvKeyValue::TEvRequest* request, + const TActorContext& ctx) +{ + AddCmdWrite(newWrite, request, 0, ctx); +} + void TPartition::RenameFormedBlobs(const std::deque<TPartitionedBlob::TRenameFormedBlobInfo>& formedBlobs, TProcessParametersBase& parameters, ui32 curWrites, diff --git a/ydb/core/persqueue/read.h b/ydb/core/persqueue/read.h index fca39835e8c..47e9d200584 100644 --- a/ydb/core/persqueue/read.h +++ b/ydb/core/persqueue/read.h @@ -205,6 +205,7 @@ namespace NPQ { Y_ABORT_UNLESS(outBlobs[pos].Value.empty()); outBlobs[pos].Value = r->GetValue(); + outBlobs[pos].CreationUnixTime = r->GetCreationUnixTime(); } else { LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Got Error response " << r->GetStatus() << " for " << i << "'s blob from " << resp.ReadResultSize() << " blobs"); diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp index 756855d49bc..8bdb87773bb 100644 --- a/ydb/core/persqueue/ut/pq_ut.cpp +++ b/ydb/core/persqueue/ut/pq_ut.cpp @@ -2653,6 +2653,92 @@ Y_UNIT_TEST(PQ_Tablet_Removes_Blobs_Asynchronously) "the PQ tablet did not delete the '" << firstMessageKey << "' key during startup"); } +Y_UNIT_TEST(The_Value_Of_CreationUnixTime_Must_Not_Decrease) +{ + auto simulateSleep = [](TDuration d, TTestContext& tc) { + tc.Runtime->AdvanceCurrentTime(d); + tc.Runtime->SimulateSleep(TDuration::MilliSeconds(1)); + }; + + auto writeMessages = [&](ui64 begin, ui64 end, size_t size, TTestContext& tc) { + for (ui64 offset = begin; offset < end; ++offset) { + TVector<std::pair<ui64, TString>> data; + data.emplace_back(offset, TString(size, 'x')); + + CmdWrite(0, "sourceId", data, tc, false, {}, true, "", -1, offset); + + simulateSleep(TDuration::MilliSeconds(1234), tc); + } + }; + + TTestContext tc; + TFinalizer finalizer(tc); + tc.Prepare(); + + // Turn off the asynchronous compactor + tc.Runtime->GetAppData(0).PQConfig.MutableCompactionConfig()->SetBlobsCount(300); + tc.Runtime->GetAppData(0).PQConfig.MutableCompactionConfig()->SetBlobsSize(50_MB); + + // Create a topic with a single batch. Blobs will not be deleted by retention + PQTabletPrepare({.partitions = 1, .storageLimitBytes = 50_MB}, {}, tc); + + // Write multiple messages so that three zones appear + writeMessages(1, 20, 1_MB, tc); + writeMessages(20, 25, 40_KB, tc); + + // The asynchronous compactor will start working after restarting the tablet + tc.Runtime->GetAppData(0).PQConfig.MutableCompactionConfig()->SetBlobsCount(300); + tc.Runtime->GetAppData(0).PQConfig.MutableCompactionConfig()->SetBlobsSize(8_MB); + + PQTabletRestart(tc); + + // Let the asynchronous compactor work + Sleep(TDuration::Seconds(5)); + + // We can read any of the written messages + for (i32 i = 1; i < 25; ++i) { + CmdRead(0, i, 1, Max<i32>(), 1, false, tc, {i}); + } + + // The list of keys in the PQ tablet + TVector<TString> keys; + + for (auto& key : GetTabletKeys(tc)) { + keys.push_back(std::move(key)); + } + std::sort(keys.begin(), keys.end()); + + // The value of `CreationUnixTime` must not decrease + ui64 currentCreationUnixTime = 0; + for (const auto& key : keys) { + Y_ABORT_UNLESS(!key.empty()); + if (key.front() != TKeyPrefix::TypeData) { + continue; + } + + auto request = MakeHolder<TEvKeyValue::TEvRequest>(); + auto read = request->Record.AddCmdReadRange(); + auto range = read->MutableRange(); + range->SetFrom(key); + range->SetIncludeFrom(true); + range->SetTo(key); + range->SetIncludeTo(true); + + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + TAutoPtr<IEventHandle> handle; + auto* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); + + UNIT_ASSERT_VALUES_EQUAL(result->Record.ReadRangeResultSize(), 1); + UNIT_ASSERT_VALUES_EQUAL(result->Record.GetReadRangeResult(0).PairSize(), 1); + UNIT_ASSERT_LE_C(currentCreationUnixTime, result->Record.GetReadRangeResult(0).GetPair(0).GetCreationUnixTime(), + "key=" << key << + ", currentCreationUnixTime=" << currentCreationUnixTime << + ", result.CreationUnixTime=" << result->Record.GetReadRangeResult(0).GetPair(0).GetCreationUnixTime()); + + currentCreationUnixTime = result->Record.GetReadRangeResult(0).GetPair(0).GetCreationUnixTime(); + } +} + Y_UNIT_TEST(PQ_Tablet_Does_Not_Remove_The_Blob_Until_The_Reading_Is_Complete) { // The test verifies that the block is not deleted until the reading is finished. We write |
