summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlek5andr-Kotov <[email protected]>2025-06-10 17:00:17 +0300
committerGitHub <[email protected]>2025-06-10 14:00:17 +0000
commitdf34e46a9900b631bdb329914ed03e5ed2a94759 (patch)
tree4d0643037cd5e8b5b94f89f154c6da37668b1c96
parent5c8d8e7d66d7a903b981c057f9a53865003e00d3 (diff)
The `CreationUnixTime` field for `TCmdWrite` and `TCmdRename` (#19529)
-rw-r--r--ydb/core/keyvalue/keyvalue_intermediate.h2
-rw-r--r--ydb/core/keyvalue/keyvalue_state.cpp26
-rw-r--r--ydb/core/keyvalue/keyvalue_ut.cpp126
-rw-r--r--ydb/core/protos/msgbus_kv.proto2
4 files changed, 142 insertions, 14 deletions
diff --git a/ydb/core/keyvalue/keyvalue_intermediate.h b/ydb/core/keyvalue/keyvalue_intermediate.h
index 0eb903be0de..c4017e8f30c 100644
--- a/ydb/core/keyvalue/keyvalue_intermediate.h
+++ b/ydb/core/keyvalue/keyvalue_intermediate.h
@@ -70,6 +70,7 @@ struct TIntermediate {
NKikimrProto::EReplyStatus Status;
TStorageStatusFlags StatusFlags;
TDuration Latency;
+ ui64 CreationUnixTime;
};
struct TDelete {
TKeyRange Range;
@@ -77,6 +78,7 @@ struct TIntermediate {
struct TRename {
TString OldKey;
TString NewKey;
+ ui64 CreationUnixTime;
};
struct TCopyRange {
TKeyRange Range;
diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp
index e0698e78961..bba6e622324 100644
--- a/ydb/core/keyvalue/keyvalue_state.cpp
+++ b/ydb/core/keyvalue/keyvalue_state.cpp
@@ -68,6 +68,16 @@ bool IsKeyLengthValid(const TString& key) {
return key.length() <= MaxKeySize;
}
+template <class R, class I>
+void PrepareCreationUnixTime(const R& request, I& interm)
+{
+ if (request.HasCreationUnixTime()) {
+ interm.CreationUnixTime = request.GetCreationUnixTime();
+ } else {
+ interm.CreationUnixTime = TAppData::TimeProvider->Now().Seconds();
+ }
+}
+
// Guideline:
// Check SetError calls: there must be no changes made to the DB before SetError call (!)
@@ -1073,7 +1083,7 @@ NKikimrKeyValue::StorageChannel::StatusFlag GetStatusFlag(const TStorageStatusFl
void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request,
NKikimrClient::TKeyValueResponse::TWriteResult *legacyResponse,
NKikimrKeyValue::StorageChannel *response,
- ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime,
+ ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 /*unixTime*/,
TIntermediate* /*intermediate*/)
{
TIndexRecord& record = Index[request.Key];
@@ -1108,7 +1118,7 @@ void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request,
ctx.Send(ChannelBalancerActorId, new TChannelBalancer::TEvReportWriteLatency(channel, request.Latency));
}
- record.CreationUnixTime = unixTime;
+ record.CreationUnixTime = request.CreationUnixTime;
UpdateKeyValue(request.Key, record, db, ctx);
if (legacyResponse) {
@@ -1176,7 +1186,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TDelete &request,
void TKeyValueState::ProcessCmd(const TIntermediate::TRename &request,
NKikimrClient::TKeyValueResponse::TRenameResult *legacyResponse,
NKikimrKeyValue::StorageChannel */*response*/,
- ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime,
+ ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 /*unixTime*/,
TIntermediate* /*intermediate*/)
{
auto oldIter = Index.find(request.OldKey);
@@ -1186,7 +1196,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TRename &request,
TIndexRecord& dest = Index[request.NewKey];
Dereference(dest, db);
dest.Chain = std::move(source.Chain);
- dest.CreationUnixTime = unixTime;
+ dest.CreationUnixTime = request.CreationUnixTime;
THelpers::DbEraseUserKey(oldIter->first, db);
Index.erase(oldIter);
@@ -1318,11 +1328,10 @@ void TKeyValueState::CmdReadRange(THolder<TIntermediate> &intermediate, ISimpleD
}
void TKeyValueState::CmdRename(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx) {
- ui64 unixTime = TAppData::TimeProvider->Now().Seconds();
for (ui32 i = 0; i < intermediate->Renames.size(); ++i) {
auto& request = intermediate->Renames[i];
auto *response = intermediate->Response.AddRenameResult();
- ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, unixTime, intermediate.Get());
+ ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, request.CreationUnixTime, intermediate.Get());
}
}
@@ -1335,11 +1344,10 @@ void TKeyValueState::CmdDelete(THolder<TIntermediate> &intermediate, ISimpleDb &
}
void TKeyValueState::CmdWrite(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx) {
- ui64 unixTime = TAppData::TimeProvider->Now().Seconds();
for (ui32 i = 0; i < intermediate->Writes.size(); ++i) {
auto& request = intermediate->Writes[i];
auto *response = intermediate->Response.AddWriteResult();
- ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, unixTime, intermediate.Get());
+ ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, request.CreationUnixTime, intermediate.Get());
}
ResourceMetrics->TryUpdate(ctx);
}
@@ -2243,6 +2251,7 @@ bool TKeyValueState::PrepareCmdRename(const TActorContext &ctx, NKikimrClient::T
interm.OldKey = request.GetOldKey();
interm.NewKey = request.GetNewKey();
+ PrepareCreationUnixTime(request, interm);
}
return false;
}
@@ -2365,6 +2374,7 @@ bool TKeyValueState::PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TK
}
interm.Key = request.GetKey();
+ PrepareCreationUnixTime(request, interm);
switch (request.GetDataCase()) {
case NKikimrClient::TKeyValueRequest::TCmdWrite::kValue:
diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp
index 0ff3486f5cd..efc4dc2159a 100644
--- a/ydb/core/keyvalue/keyvalue_ut.cpp
+++ b/ydb/core/keyvalue/keyvalue_ut.cpp
@@ -158,8 +158,11 @@ void DoWithRetry(std::function<bool(void)> action, i32 retryCount = 2) {
void CmdWrite(const TDeque<TString> &keys, const TDeque<TString> &values,
const NKikimrClient::TKeyValueRequest::EStorageChannel storageChannel,
- const NKikimrClient::TKeyValueRequest::EPriority priority, TTestContext &tc) {
+ const NKikimrClient::TKeyValueRequest::EPriority priority,
+ const TDeque<ui64>& creationUnixTimes,
+ TTestContext &tc) {
Y_ABORT_UNLESS(keys.size() == values.size());
+ Y_ABORT_UNLESS(creationUnixTimes.empty() || (creationUnixTimes.size() == keys.size()));
TAutoPtr<IEventHandle> handle;
TEvKeyValue::TEvResponse *result;
THolder<TEvKeyValue::TEvRequest> request;
@@ -172,6 +175,9 @@ void CmdWrite(const TDeque<TString> &keys, const TDeque<TString> &values,
write->SetValue(values[idx]);
write->SetStorageChannel(storageChannel);
write->SetPriority(priority);
+ if (idx < creationUnixTimes.size()) {
+ write->SetCreationUnixTime(creationUnixTimes[idx]);
+ }
}
tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
@@ -192,6 +198,17 @@ void CmdWrite(const TDeque<TString> &keys, const TDeque<TString> &values,
});
}
+void CmdWrite(const TDeque<TString> &keys, const TDeque<TString> &values,
+ const NKikimrClient::TKeyValueRequest::EStorageChannel storageChannel,
+ const NKikimrClient::TKeyValueRequest::EPriority priority,
+ TTestContext &tc) {
+ CmdWrite(keys, values,
+ storageChannel,
+ priority,
+ {},
+ tc);
+}
+
struct TDiff {
ui32 Offset;
TString Buffer;
@@ -232,10 +249,21 @@ void CmdPatch(const TString &originalKey, const TString &patchedKey, const TVect
void CmdWrite(const TString &key, const TString &value,
const NKikimrClient::TKeyValueRequest::EStorageChannel storageChannel,
+ const NKikimrClient::TKeyValueRequest::EPriority priority,
+ const ui64 creationUnixTime,
+ TTestContext &tc) {
+ TDeque<TString> keys = {key};
+ TDeque<TString> values = {value};
+ TDeque<ui64> creationUnixTimes = {creationUnixTime};
+ CmdWrite(keys, values, storageChannel, priority, creationUnixTimes, tc);
+}
+
+void CmdWrite(const TString &key, const TString &value,
+ const NKikimrClient::TKeyValueRequest::EStorageChannel storageChannel,
const NKikimrClient::TKeyValueRequest::EPriority priority, TTestContext &tc) {
TDeque<TString> keys = {key};
TDeque<TString> values = {value};
- CmdWrite(keys, values, storageChannel, priority, tc);
+ CmdWrite(keys, values, storageChannel, priority, {}, tc);
}
void CmdRead(const TDeque<TString> &keys,
@@ -276,9 +304,11 @@ void CmdRead(const TDeque<TString> &keys,
});
}
-void CmdRename(const TDeque<TString> &oldKeys, const TDeque<TString> &newKeys, TTestContext &tc,
- bool expectOk = true) {
+void CmdRename(const TDeque<TString> &oldKeys, const TDeque<TString> &newKeys, const TDeque<ui64>& renameUnixTimes,
+ TTestContext &tc, bool expectOk = true)
+{
Y_ABORT_UNLESS(oldKeys.size() == newKeys.size());
+ Y_ABORT_UNLESS(renameUnixTimes.empty() || (oldKeys.size() == renameUnixTimes.size()));
TAutoPtr<IEventHandle> handle;
TEvKeyValue::TEvResponse *result;
THolder<TEvKeyValue::TEvRequest> request;
@@ -290,6 +320,9 @@ void CmdRename(const TDeque<TString> &oldKeys, const TDeque<TString> &newKeys, T
auto cmd = request->Record.AddCmdRename();
cmd->SetOldKey(oldKeys[idx]);
cmd->SetNewKey(newKeys[idx]);
+ if (idx < renameUnixTimes.size()) {
+ cmd->SetCreationUnixTime(renameUnixTimes[idx]);
+ }
}
tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
@@ -311,10 +344,23 @@ void CmdRename(const TDeque<TString> &oldKeys, const TDeque<TString> &newKeys, T
});
}
+void CmdRename(const TDeque<TString> &oldKeys, const TDeque<TString> &newKeys, TTestContext &tc,
+ bool expectOk = true) {
+ CmdRename(oldKeys, newKeys, {}, tc, expectOk);
+}
+
+void CmdRename(const TString &oldKey, const TString &newKey, const ui64 renameUnixTime,
+ TTestContext &tc, bool expectOk = true) {
+ TDeque<TString> oldKeys = {oldKey};
+ TDeque<TString> newKeys = {newKey};
+ TDeque<ui64> renameUnixTimes = {renameUnixTime};
+ CmdRename(oldKeys, newKeys, renameUnixTimes, tc, expectOk);
+}
+
void CmdRename(const TString &oldKey, const TString &newKey, TTestContext &tc, bool expectOk = true) {
TDeque<TString> oldKeys = {oldKey};
TDeque<TString> newKeys = {newKey};
- CmdRename(oldKeys, newKeys, tc, expectOk);
+ CmdRename(oldKeys, newKeys, {}, tc, expectOk);
}
void CmdConcat(const TDeque<TString> &inputKeys, const TString &outputKey, const bool keepInputs, TTestContext &tc) {
@@ -446,6 +492,9 @@ void CheckResponse(NKikimrClient::TResponse &ar, NKikimrClient::TKeyValueRespons
}
UNIT_ASSERT_C(aPair.HasCreationUnixTime(), "Line# " << line);
//TODO: UNIT_ASSERT(aPair.GetCreationUnixTime() >= unixTime);
+ if (ePair.HasCreationUnixTime()) {
+ UNIT_ASSERT_VALUES_EQUAL_C(aPair.GetCreationUnixTime(), ePair.GetCreationUnixTime(), "Line# " << line);
+ }
}
}
}
@@ -476,10 +525,11 @@ void RunRequest(TDesiredPair<TEvKeyValue::TEvRequest> &dp, TTestContext &tc, ui6
void AddCmdReadRange(const TString &from, const bool includeFrom, const TString &to, const bool includeTo,
const bool includeData, const ui64 limitBytes,
const NKikimrClient::TKeyValueRequest::EPriority priority,
- const TDeque<TString> &expectedKeys, const TDeque<TString> &expectedValues,
+ const TDeque<TString> &expectedKeys, const TDeque<TString> &expectedValues, const TDeque<ui64>& expectedCreationUnixTimes,
const NKikimrProto::EReplyStatus expectedStatus, TTestContext &tc, TDesiredPair<TEvKeyValue::TEvRequest> &dp) {
Y_UNUSED(tc);
Y_ABORT_UNLESS(!includeData || expectedKeys.size() == expectedValues.size());
+ Y_ABORT_UNLESS(expectedCreationUnixTimes.empty() || (expectedCreationUnixTimes.size() == expectedKeys.size()));
{
auto cmd = dp.Request.AddCmdReadRange();
@@ -504,10 +554,29 @@ void AddCmdReadRange(const TString &from, const bool includeFrom, const TString
if (i < expectedValues.size()) {
pair->SetValue(expectedValues[i]);
}
+ if (i < expectedCreationUnixTimes.size()) {
+ pair->SetCreationUnixTime(expectedCreationUnixTimes[i]);
+ }
}
}
}
+void AddCmdReadRange(const TString &from, const bool includeFrom, const TString &to, const bool includeTo,
+ const bool includeData, const ui64 limitBytes,
+ const NKikimrClient::TKeyValueRequest::EPriority priority,
+ const TDeque<TString> &expectedKeys, const TDeque<TString> &expectedValues,
+ const NKikimrProto::EReplyStatus expectedStatus, TTestContext &tc, TDesiredPair<TEvKeyValue::TEvRequest> &dp) {
+ AddCmdReadRange(from, includeFrom,
+ to, includeTo,
+ includeData,
+ limitBytes,
+ priority,
+ expectedKeys, expectedValues, {},
+ expectedStatus,
+ tc,
+ dp);
+}
+
void CmdGetStatus(const NKikimrClient::TKeyValueRequest::EStorageChannel storageChannel,
const ui32 expectedStatusFlags, TTestContext &tc) {
TAutoPtr<IEventHandle> handle;
@@ -2678,5 +2747,50 @@ Y_UNIT_TEST(TestCleanUpDataWithMockDisk) {
});
}
+Y_UNIT_TEST(TestWriteAndRenameWithCreationUnixTime)
+{
+ const ui64 creationUnixTime = (TInstant::Now() - TDuration::Seconds(1000)).Seconds();
+
+ TTestContext tc;
+ TFinalizer finalizer(tc);
+ bool activeZone = false;
+ tc.Prepare(INITIAL_TEST_DISPATCH_NAME, [](TTestActorRuntime &){}, activeZone);
+
+ CmdWrite("key-1", "value",
+ NKikimrClient::TKeyValueRequest::MAIN,
+ NKikimrClient::TKeyValueRequest::REALTIME,
+ creationUnixTime,
+ tc);
+
+ {
+ TDesiredPair<TEvKeyValue::TEvRequest> dp;
+ AddCmdReadRange("key-1", true, "key-1", true,
+ false,
+ Max<ui64>(),
+ NKikimrClient::TKeyValueRequest::REALTIME,
+ {"key-1"}, {}, {creationUnixTime},
+ NKikimrProto::OK,
+ tc, dp);
+ RunRequest(dp, tc, __LINE__);
+ }
+
+ const ui64 renameUnixTime = creationUnixTime - 1000;
+
+ CmdRename("key-1", "key-2", renameUnixTime, tc);
+
+ {
+ TDesiredPair<TEvKeyValue::TEvRequest> dp;
+ AddCmdReadRange("key-2", true, "key-2", true,
+ false,
+ Max<ui64>(),
+ NKikimrClient::TKeyValueRequest::REALTIME,
+ {"key-2"}, {}, {renameUnixTime},
+ NKikimrProto::OK,
+ tc, dp);
+ RunRequest(dp, tc, __LINE__);
+ }
+
+}
+
} // TKeyValueTest
} // NKikimr
diff --git a/ydb/core/protos/msgbus_kv.proto b/ydb/core/protos/msgbus_kv.proto
index d85c2b2c5cb..7c53e0643f8 100644
--- a/ydb/core/protos/msgbus_kv.proto
+++ b/ydb/core/protos/msgbus_kv.proto
@@ -61,10 +61,12 @@ message TKeyValueRequest {
optional bytes KeyToCache = 5; // used in PQ
optional ETactic Tactic = 6 [default = MAX_THROUGHPUT]; // mandatory, used for non-inline puts only
repeated EStorageChannel AutoselectChannel = 7; // when filled, channel is selected automatically from this set
+ optional uint64 CreationUnixTime = 9;
}
message TCmdRename {
optional bytes OldKey = 1; // mandatory
optional bytes NewKey = 2; // mandatory
+ optional uint64 CreationUnixTime = 3;
}
message TCmdCopyRange {
optional TKeyRange Range = 1; // optional; if not set, whole database is copied