summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/persqueue/events/internal.h1
-rw-r--r--ydb/core/persqueue/partition.h6
-rw-r--r--ydb/core/persqueue/partition_compaction.cpp8
-rw-r--r--ydb/core/persqueue/partition_write.cpp11
-rw-r--r--ydb/core/persqueue/read.h1
-rw-r--r--ydb/core/persqueue/ut/pq_ut.cpp86
6 files changed, 112 insertions, 1 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index a815ce54a1d..4f274aba918 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -57,6 +57,7 @@ namespace NPQ {
TString Value;
bool Cached;
TKey Key;
+ ui64 CreationUnixTime = 0;
TRequestedBlob() = delete;
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index e05a3c926b1..ec89b341309 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -1004,6 +1004,10 @@ private:
void AddCmdWrite(const std::optional<TPartitionedBlob::TFormedBlobInfo>& newWrite,
TEvKeyValue::TEvRequest* request,
+ ui64 creationUnixTime,
+ const TActorContext& ctx);
+ void AddCmdWrite(const std::optional<TPartitionedBlob::TFormedBlobInfo>& newWrite,
+ TEvKeyValue::TEvRequest* request,
const TActorContext& ctx);
void RenameFormedBlobs(const std::deque<TPartitionedBlob::TRenameFormedBlobInfo>& formedBlobs,
TProcessParametersBase& parameters,
@@ -1054,6 +1058,8 @@ private:
TInstant GetFirstUncompactedBlobTimestamp() const;
void TryCorrectStartOffset(TMaybe<ui64> offset);
+
+ ui64 LastRequestedBlobCreationUnixTime = 0;
};
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/partition_compaction.cpp b/ydb/core/persqueue/partition_compaction.cpp
index a49c597427d..6c8f117340c 100644
--- a/ydb/core/persqueue/partition_compaction.cpp
+++ b/ydb/core/persqueue/partition_compaction.cpp
@@ -85,7 +85,7 @@ bool TPartition::ExecRequestForCompaction(TWriteMsg& p, TProcessParametersBase&
auto newWrite = CompactionBlobEncoder.PartitionedBlob.Add(std::move(blob));
if (newWrite && !newWrite->Value.empty()) {
- AddCmdWrite(newWrite, request, ctx);
+ AddCmdWrite(newWrite, request, LastRequestedBlobCreationUnixTime, ctx);
PQ_LOG_D("Topic '" << TopicName() <<
"' partition " << Partition <<
@@ -266,6 +266,8 @@ void TPartition::BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>&
for (const auto& requestedBlob : blobs) {
TMaybe<ui64> firstBlobOffset = requestedBlob.Offset;
+ LastRequestedBlobCreationUnixTime = requestedBlob.CreationUnixTime;
+
for (TBlobIterator it(requestedBlob.Key, requestedBlob.Value); it.IsValid(); it.Next()) {
TBatch batch = it.GetBatch();
batch.Unpack();
@@ -306,6 +308,9 @@ void TPartition::BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>&
EndProcessWritesForCompaction(compactionRequest.Get(), ctx);
+ // for debugging purposes
+ //DumpKeyValueRequest(compactionRequest->Record);
+
ctx.Send(BlobCache, compactionRequest.Release(), 0, 0);
}
@@ -468,6 +473,7 @@ void TPartition::AddNewCompactionWriteBlob(std::pair<TKey, ui32>& res, TEvKeyVal
auto write = request->Record.AddCmdWrite();
write->SetKey(key.Data(), key.Size());
write->SetValue(valueD);
+ write->SetCreationUnixTime(LastRequestedBlobCreationUnixTime);
bool isInline = key.IsHead() && valueD.size() < MAX_INLINE_SIZE;
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index 3d43948b5fa..253fa1a9026 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -1005,11 +1005,15 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) {
void TPartition::AddCmdWrite(const std::optional<TPartitionedBlob::TFormedBlobInfo>& newWrite,
TEvKeyValue::TEvRequest* request,
+ ui64 creationUnixTime,
const TActorContext& ctx)
{
auto write = request->Record.AddCmdWrite();
write->SetKey(newWrite->Key.Data(), newWrite->Key.Size());
write->SetValue(newWrite->Value);
+ if (creationUnixTime) {
+ write->SetCreationUnixTime(creationUnixTime);
+ }
//Y_ABORT_UNLESS(newWrite->Key.IsFastWrite());
auto channel = GetChannel(NextChannel(newWrite->Key.HasSuffix(), newWrite->Value.size()));
write->SetStorageChannel(channel);
@@ -1020,6 +1024,13 @@ void TPartition::AddCmdWrite(const std::optional<TPartitionedBlob::TFormedBlobIn
WriteCycleSize += newWrite->Value.size();
}
+void TPartition::AddCmdWrite(const std::optional<TPartitionedBlob::TFormedBlobInfo>& newWrite,
+ TEvKeyValue::TEvRequest* request,
+ const TActorContext& ctx)
+{
+ AddCmdWrite(newWrite, request, 0, ctx);
+}
+
void TPartition::RenameFormedBlobs(const std::deque<TPartitionedBlob::TRenameFormedBlobInfo>& formedBlobs,
TProcessParametersBase& parameters,
ui32 curWrites,
diff --git a/ydb/core/persqueue/read.h b/ydb/core/persqueue/read.h
index fca39835e8c..47e9d200584 100644
--- a/ydb/core/persqueue/read.h
+++ b/ydb/core/persqueue/read.h
@@ -205,6 +205,7 @@ namespace NPQ {
Y_ABORT_UNLESS(outBlobs[pos].Value.empty());
outBlobs[pos].Value = r->GetValue();
+ outBlobs[pos].CreationUnixTime = r->GetCreationUnixTime();
} else {
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Got Error response " << r->GetStatus()
<< " for " << i << "'s blob from " << resp.ReadResultSize() << " blobs");
diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp
index 756855d49bc..8bdb87773bb 100644
--- a/ydb/core/persqueue/ut/pq_ut.cpp
+++ b/ydb/core/persqueue/ut/pq_ut.cpp
@@ -2653,6 +2653,92 @@ Y_UNIT_TEST(PQ_Tablet_Removes_Blobs_Asynchronously)
"the PQ tablet did not delete the '" << firstMessageKey << "' key during startup");
}
+Y_UNIT_TEST(The_Value_Of_CreationUnixTime_Must_Not_Decrease)
+{
+ auto simulateSleep = [](TDuration d, TTestContext& tc) {
+ tc.Runtime->AdvanceCurrentTime(d);
+ tc.Runtime->SimulateSleep(TDuration::MilliSeconds(1));
+ };
+
+ auto writeMessages = [&](ui64 begin, ui64 end, size_t size, TTestContext& tc) {
+ for (ui64 offset = begin; offset < end; ++offset) {
+ TVector<std::pair<ui64, TString>> data;
+ data.emplace_back(offset, TString(size, 'x'));
+
+ CmdWrite(0, "sourceId", data, tc, false, {}, true, "", -1, offset);
+
+ simulateSleep(TDuration::MilliSeconds(1234), tc);
+ }
+ };
+
+ TTestContext tc;
+ TFinalizer finalizer(tc);
+ tc.Prepare();
+
+ // Turn off the asynchronous compactor
+ tc.Runtime->GetAppData(0).PQConfig.MutableCompactionConfig()->SetBlobsCount(300);
+ tc.Runtime->GetAppData(0).PQConfig.MutableCompactionConfig()->SetBlobsSize(50_MB);
+
+ // Create a topic with a single batch. Blobs will not be deleted by retention
+ PQTabletPrepare({.partitions = 1, .storageLimitBytes = 50_MB}, {}, tc);
+
+ // Write multiple messages so that three zones appear
+ writeMessages(1, 20, 1_MB, tc);
+ writeMessages(20, 25, 40_KB, tc);
+
+ // The asynchronous compactor will start working after restarting the tablet
+ tc.Runtime->GetAppData(0).PQConfig.MutableCompactionConfig()->SetBlobsCount(300);
+ tc.Runtime->GetAppData(0).PQConfig.MutableCompactionConfig()->SetBlobsSize(8_MB);
+
+ PQTabletRestart(tc);
+
+ // Let the asynchronous compactor work
+ Sleep(TDuration::Seconds(5));
+
+ // We can read any of the written messages
+ for (i32 i = 1; i < 25; ++i) {
+ CmdRead(0, i, 1, Max<i32>(), 1, false, tc, {i});
+ }
+
+ // The list of keys in the PQ tablet
+ TVector<TString> keys;
+
+ for (auto& key : GetTabletKeys(tc)) {
+ keys.push_back(std::move(key));
+ }
+ std::sort(keys.begin(), keys.end());
+
+ // The value of `CreationUnixTime` must not decrease
+ ui64 currentCreationUnixTime = 0;
+ for (const auto& key : keys) {
+ Y_ABORT_UNLESS(!key.empty());
+ if (key.front() != TKeyPrefix::TypeData) {
+ continue;
+ }
+
+ auto request = MakeHolder<TEvKeyValue::TEvRequest>();
+ auto read = request->Record.AddCmdReadRange();
+ auto range = read->MutableRange();
+ range->SetFrom(key);
+ range->SetIncludeFrom(true);
+ range->SetTo(key);
+ range->SetIncludeTo(true);
+
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ TAutoPtr<IEventHandle> handle;
+ auto* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
+
+ UNIT_ASSERT_VALUES_EQUAL(result->Record.ReadRangeResultSize(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(result->Record.GetReadRangeResult(0).PairSize(), 1);
+ UNIT_ASSERT_LE_C(currentCreationUnixTime, result->Record.GetReadRangeResult(0).GetPair(0).GetCreationUnixTime(),
+ "key=" << key <<
+ ", currentCreationUnixTime=" << currentCreationUnixTime <<
+ ", result.CreationUnixTime=" << result->Record.GetReadRangeResult(0).GetPair(0).GetCreationUnixTime());
+
+ currentCreationUnixTime = result->Record.GetReadRangeResult(0).GetPair(0).GetCreationUnixTime();
+ }
+}
+
Y_UNIT_TEST(PQ_Tablet_Does_Not_Remove_The_Blob_Until_The_Reading_Is_Complete)
{
// The test verifies that the block is not deleted until the reading is finished. We write