diff options
author | Alek5andr-Kotov <[email protected]> | 2025-06-10 17:00:17 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2025-06-10 14:00:17 +0000 |
commit | df34e46a9900b631bdb329914ed03e5ed2a94759 (patch) | |
tree | 4d0643037cd5e8b5b94f89f154c6da37668b1c96 | |
parent | 5c8d8e7d66d7a903b981c057f9a53865003e00d3 (diff) |
The `CreationUnixTime` field for `TCmdWrite` and `TCmdRename` (#19529)
-rw-r--r-- | ydb/core/keyvalue/keyvalue_intermediate.h | 2 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.cpp | 26 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_ut.cpp | 126 | ||||
-rw-r--r-- | ydb/core/protos/msgbus_kv.proto | 2 |
4 files changed, 142 insertions, 14 deletions
diff --git a/ydb/core/keyvalue/keyvalue_intermediate.h b/ydb/core/keyvalue/keyvalue_intermediate.h index 0eb903be0de..c4017e8f30c 100644 --- a/ydb/core/keyvalue/keyvalue_intermediate.h +++ b/ydb/core/keyvalue/keyvalue_intermediate.h @@ -70,6 +70,7 @@ struct TIntermediate { NKikimrProto::EReplyStatus Status; TStorageStatusFlags StatusFlags; TDuration Latency; + ui64 CreationUnixTime; }; struct TDelete { TKeyRange Range; @@ -77,6 +78,7 @@ struct TIntermediate { struct TRename { TString OldKey; TString NewKey; + ui64 CreationUnixTime; }; struct TCopyRange { TKeyRange Range; diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp index e0698e78961..bba6e622324 100644 --- a/ydb/core/keyvalue/keyvalue_state.cpp +++ b/ydb/core/keyvalue/keyvalue_state.cpp @@ -68,6 +68,16 @@ bool IsKeyLengthValid(const TString& key) { return key.length() <= MaxKeySize; } +template <class R, class I> +void PrepareCreationUnixTime(const R& request, I& interm) +{ + if (request.HasCreationUnixTime()) { + interm.CreationUnixTime = request.GetCreationUnixTime(); + } else { + interm.CreationUnixTime = TAppData::TimeProvider->Now().Seconds(); + } +} + // Guideline: // Check SetError calls: there must be no changes made to the DB before SetError call (!) @@ -1073,7 +1083,7 @@ NKikimrKeyValue::StorageChannel::StatusFlag GetStatusFlag(const TStorageStatusFl void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request, NKikimrClient::TKeyValueResponse::TWriteResult *legacyResponse, NKikimrKeyValue::StorageChannel *response, - ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime, + ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 /*unixTime*/, TIntermediate* /*intermediate*/) { TIndexRecord& record = Index[request.Key]; @@ -1108,7 +1118,7 @@ void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request, ctx.Send(ChannelBalancerActorId, new TChannelBalancer::TEvReportWriteLatency(channel, request.Latency)); } - record.CreationUnixTime = unixTime; + record.CreationUnixTime = request.CreationUnixTime; UpdateKeyValue(request.Key, record, db, ctx); if (legacyResponse) { @@ -1176,7 +1186,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TDelete &request, void TKeyValueState::ProcessCmd(const TIntermediate::TRename &request, NKikimrClient::TKeyValueResponse::TRenameResult *legacyResponse, NKikimrKeyValue::StorageChannel */*response*/, - ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime, + ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 /*unixTime*/, TIntermediate* /*intermediate*/) { auto oldIter = Index.find(request.OldKey); @@ -1186,7 +1196,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TRename &request, TIndexRecord& dest = Index[request.NewKey]; Dereference(dest, db); dest.Chain = std::move(source.Chain); - dest.CreationUnixTime = unixTime; + dest.CreationUnixTime = request.CreationUnixTime; THelpers::DbEraseUserKey(oldIter->first, db); Index.erase(oldIter); @@ -1318,11 +1328,10 @@ void TKeyValueState::CmdReadRange(THolder<TIntermediate> &intermediate, ISimpleD } void TKeyValueState::CmdRename(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx) { - ui64 unixTime = TAppData::TimeProvider->Now().Seconds(); for (ui32 i = 0; i < intermediate->Renames.size(); ++i) { auto& request = intermediate->Renames[i]; auto *response = intermediate->Response.AddRenameResult(); - ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, unixTime, intermediate.Get()); + ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, request.CreationUnixTime, intermediate.Get()); } } @@ -1335,11 +1344,10 @@ void TKeyValueState::CmdDelete(THolder<TIntermediate> &intermediate, ISimpleDb & } void TKeyValueState::CmdWrite(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx) { - ui64 unixTime = TAppData::TimeProvider->Now().Seconds(); for (ui32 i = 0; i < intermediate->Writes.size(); ++i) { auto& request = intermediate->Writes[i]; auto *response = intermediate->Response.AddWriteResult(); - ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, unixTime, intermediate.Get()); + ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, request.CreationUnixTime, intermediate.Get()); } ResourceMetrics->TryUpdate(ctx); } @@ -2243,6 +2251,7 @@ bool TKeyValueState::PrepareCmdRename(const TActorContext &ctx, NKikimrClient::T interm.OldKey = request.GetOldKey(); interm.NewKey = request.GetNewKey(); + PrepareCreationUnixTime(request, interm); } return false; } @@ -2365,6 +2374,7 @@ bool TKeyValueState::PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TK } interm.Key = request.GetKey(); + PrepareCreationUnixTime(request, interm); switch (request.GetDataCase()) { case NKikimrClient::TKeyValueRequest::TCmdWrite::kValue: diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp index 0ff3486f5cd..efc4dc2159a 100644 --- a/ydb/core/keyvalue/keyvalue_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_ut.cpp @@ -158,8 +158,11 @@ void DoWithRetry(std::function<bool(void)> action, i32 retryCount = 2) { void CmdWrite(const TDeque<TString> &keys, const TDeque<TString> &values, const NKikimrClient::TKeyValueRequest::EStorageChannel storageChannel, - const NKikimrClient::TKeyValueRequest::EPriority priority, TTestContext &tc) { + const NKikimrClient::TKeyValueRequest::EPriority priority, + const TDeque<ui64>& creationUnixTimes, + TTestContext &tc) { Y_ABORT_UNLESS(keys.size() == values.size()); + Y_ABORT_UNLESS(creationUnixTimes.empty() || (creationUnixTimes.size() == keys.size())); TAutoPtr<IEventHandle> handle; TEvKeyValue::TEvResponse *result; THolder<TEvKeyValue::TEvRequest> request; @@ -172,6 +175,9 @@ void CmdWrite(const TDeque<TString> &keys, const TDeque<TString> &values, write->SetValue(values[idx]); write->SetStorageChannel(storageChannel); write->SetPriority(priority); + if (idx < creationUnixTimes.size()) { + write->SetCreationUnixTime(creationUnixTimes[idx]); + } } tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); @@ -192,6 +198,17 @@ void CmdWrite(const TDeque<TString> &keys, const TDeque<TString> &values, }); } +void CmdWrite(const TDeque<TString> &keys, const TDeque<TString> &values, + const NKikimrClient::TKeyValueRequest::EStorageChannel storageChannel, + const NKikimrClient::TKeyValueRequest::EPriority priority, + TTestContext &tc) { + CmdWrite(keys, values, + storageChannel, + priority, + {}, + tc); +} + struct TDiff { ui32 Offset; TString Buffer; @@ -232,10 +249,21 @@ void CmdPatch(const TString &originalKey, const TString &patchedKey, const TVect void CmdWrite(const TString &key, const TString &value, const NKikimrClient::TKeyValueRequest::EStorageChannel storageChannel, + const NKikimrClient::TKeyValueRequest::EPriority priority, + const ui64 creationUnixTime, + TTestContext &tc) { + TDeque<TString> keys = {key}; + TDeque<TString> values = {value}; + TDeque<ui64> creationUnixTimes = {creationUnixTime}; + CmdWrite(keys, values, storageChannel, priority, creationUnixTimes, tc); +} + +void CmdWrite(const TString &key, const TString &value, + const NKikimrClient::TKeyValueRequest::EStorageChannel storageChannel, const NKikimrClient::TKeyValueRequest::EPriority priority, TTestContext &tc) { TDeque<TString> keys = {key}; TDeque<TString> values = {value}; - CmdWrite(keys, values, storageChannel, priority, tc); + CmdWrite(keys, values, storageChannel, priority, {}, tc); } void CmdRead(const TDeque<TString> &keys, @@ -276,9 +304,11 @@ void CmdRead(const TDeque<TString> &keys, }); } -void CmdRename(const TDeque<TString> &oldKeys, const TDeque<TString> &newKeys, TTestContext &tc, - bool expectOk = true) { +void CmdRename(const TDeque<TString> &oldKeys, const TDeque<TString> &newKeys, const TDeque<ui64>& renameUnixTimes, + TTestContext &tc, bool expectOk = true) +{ Y_ABORT_UNLESS(oldKeys.size() == newKeys.size()); + Y_ABORT_UNLESS(renameUnixTimes.empty() || (oldKeys.size() == renameUnixTimes.size())); TAutoPtr<IEventHandle> handle; TEvKeyValue::TEvResponse *result; THolder<TEvKeyValue::TEvRequest> request; @@ -290,6 +320,9 @@ void CmdRename(const TDeque<TString> &oldKeys, const TDeque<TString> &newKeys, T auto cmd = request->Record.AddCmdRename(); cmd->SetOldKey(oldKeys[idx]); cmd->SetNewKey(newKeys[idx]); + if (idx < renameUnixTimes.size()) { + cmd->SetCreationUnixTime(renameUnixTimes[idx]); + } } tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); @@ -311,10 +344,23 @@ void CmdRename(const TDeque<TString> &oldKeys, const TDeque<TString> &newKeys, T }); } +void CmdRename(const TDeque<TString> &oldKeys, const TDeque<TString> &newKeys, TTestContext &tc, + bool expectOk = true) { + CmdRename(oldKeys, newKeys, {}, tc, expectOk); +} + +void CmdRename(const TString &oldKey, const TString &newKey, const ui64 renameUnixTime, + TTestContext &tc, bool expectOk = true) { + TDeque<TString> oldKeys = {oldKey}; + TDeque<TString> newKeys = {newKey}; + TDeque<ui64> renameUnixTimes = {renameUnixTime}; + CmdRename(oldKeys, newKeys, renameUnixTimes, tc, expectOk); +} + void CmdRename(const TString &oldKey, const TString &newKey, TTestContext &tc, bool expectOk = true) { TDeque<TString> oldKeys = {oldKey}; TDeque<TString> newKeys = {newKey}; - CmdRename(oldKeys, newKeys, tc, expectOk); + CmdRename(oldKeys, newKeys, {}, tc, expectOk); } void CmdConcat(const TDeque<TString> &inputKeys, const TString &outputKey, const bool keepInputs, TTestContext &tc) { @@ -446,6 +492,9 @@ void CheckResponse(NKikimrClient::TResponse &ar, NKikimrClient::TKeyValueRespons } UNIT_ASSERT_C(aPair.HasCreationUnixTime(), "Line# " << line); //TODO: UNIT_ASSERT(aPair.GetCreationUnixTime() >= unixTime); + if (ePair.HasCreationUnixTime()) { + UNIT_ASSERT_VALUES_EQUAL_C(aPair.GetCreationUnixTime(), ePair.GetCreationUnixTime(), "Line# " << line); + } } } } @@ -476,10 +525,11 @@ void RunRequest(TDesiredPair<TEvKeyValue::TEvRequest> &dp, TTestContext &tc, ui6 void AddCmdReadRange(const TString &from, const bool includeFrom, const TString &to, const bool includeTo, const bool includeData, const ui64 limitBytes, const NKikimrClient::TKeyValueRequest::EPriority priority, - const TDeque<TString> &expectedKeys, const TDeque<TString> &expectedValues, + const TDeque<TString> &expectedKeys, const TDeque<TString> &expectedValues, const TDeque<ui64>& expectedCreationUnixTimes, const NKikimrProto::EReplyStatus expectedStatus, TTestContext &tc, TDesiredPair<TEvKeyValue::TEvRequest> &dp) { Y_UNUSED(tc); Y_ABORT_UNLESS(!includeData || expectedKeys.size() == expectedValues.size()); + Y_ABORT_UNLESS(expectedCreationUnixTimes.empty() || (expectedCreationUnixTimes.size() == expectedKeys.size())); { auto cmd = dp.Request.AddCmdReadRange(); @@ -504,10 +554,29 @@ void AddCmdReadRange(const TString &from, const bool includeFrom, const TString if (i < expectedValues.size()) { pair->SetValue(expectedValues[i]); } + if (i < expectedCreationUnixTimes.size()) { + pair->SetCreationUnixTime(expectedCreationUnixTimes[i]); + } } } } +void AddCmdReadRange(const TString &from, const bool includeFrom, const TString &to, const bool includeTo, + const bool includeData, const ui64 limitBytes, + const NKikimrClient::TKeyValueRequest::EPriority priority, + const TDeque<TString> &expectedKeys, const TDeque<TString> &expectedValues, + const NKikimrProto::EReplyStatus expectedStatus, TTestContext &tc, TDesiredPair<TEvKeyValue::TEvRequest> &dp) { + AddCmdReadRange(from, includeFrom, + to, includeTo, + includeData, + limitBytes, + priority, + expectedKeys, expectedValues, {}, + expectedStatus, + tc, + dp); +} + void CmdGetStatus(const NKikimrClient::TKeyValueRequest::EStorageChannel storageChannel, const ui32 expectedStatusFlags, TTestContext &tc) { TAutoPtr<IEventHandle> handle; @@ -2678,5 +2747,50 @@ Y_UNIT_TEST(TestCleanUpDataWithMockDisk) { }); } +Y_UNIT_TEST(TestWriteAndRenameWithCreationUnixTime) +{ + const ui64 creationUnixTime = (TInstant::Now() - TDuration::Seconds(1000)).Seconds(); + + TTestContext tc; + TFinalizer finalizer(tc); + bool activeZone = false; + tc.Prepare(INITIAL_TEST_DISPATCH_NAME, [](TTestActorRuntime &){}, activeZone); + + CmdWrite("key-1", "value", + NKikimrClient::TKeyValueRequest::MAIN, + NKikimrClient::TKeyValueRequest::REALTIME, + creationUnixTime, + tc); + + { + TDesiredPair<TEvKeyValue::TEvRequest> dp; + AddCmdReadRange("key-1", true, "key-1", true, + false, + Max<ui64>(), + NKikimrClient::TKeyValueRequest::REALTIME, + {"key-1"}, {}, {creationUnixTime}, + NKikimrProto::OK, + tc, dp); + RunRequest(dp, tc, __LINE__); + } + + const ui64 renameUnixTime = creationUnixTime - 1000; + + CmdRename("key-1", "key-2", renameUnixTime, tc); + + { + TDesiredPair<TEvKeyValue::TEvRequest> dp; + AddCmdReadRange("key-2", true, "key-2", true, + false, + Max<ui64>(), + NKikimrClient::TKeyValueRequest::REALTIME, + {"key-2"}, {}, {renameUnixTime}, + NKikimrProto::OK, + tc, dp); + RunRequest(dp, tc, __LINE__); + } + +} + } // TKeyValueTest } // NKikimr diff --git a/ydb/core/protos/msgbus_kv.proto b/ydb/core/protos/msgbus_kv.proto index d85c2b2c5cb..7c53e0643f8 100644 --- a/ydb/core/protos/msgbus_kv.proto +++ b/ydb/core/protos/msgbus_kv.proto @@ -61,10 +61,12 @@ message TKeyValueRequest { optional bytes KeyToCache = 5; // used in PQ optional ETactic Tactic = 6 [default = MAX_THROUGHPUT]; // mandatory, used for non-inline puts only repeated EStorageChannel AutoselectChannel = 7; // when filled, channel is selected automatically from this set + optional uint64 CreationUnixTime = 9; } message TCmdRename { optional bytes OldKey = 1; // mandatory optional bytes NewKey = 2; // mandatory + optional uint64 CreationUnixTime = 3; } message TCmdCopyRange { optional TKeyRange Range = 1; // optional; if not set, whole database is copied |