aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-07-21 12:28:45 +0300
committeralexvru <alexvru@ydb.tech>2023-07-21 12:28:45 +0300
commitf23ee396c023ff431442b8322dd075ac8b8dff63 (patch)
treef4598b243fbc2b45476ed9c78182a4333d38473b
parentd8bf44488de42e74f19a724f685c6e9dde6a7239 (diff)
downloadydb-f23ee396c023ff431442b8322dd075ac8b8dff63.tar.gz
Fix trash processing before actual commit KIKIMR-18784
-rw-r--r--ydb/core/keyvalue/keyvalue_flat_impl.h8
-rw-r--r--ydb/core/keyvalue/keyvalue_intermediate.h2
-rw-r--r--ydb/core/keyvalue/keyvalue_state.cpp57
-rw-r--r--ydb/core/keyvalue/keyvalue_state.h9
-rw-r--r--ydb/core/keyvalue/keyvalue_storage_request.cpp5
5 files changed, 46 insertions, 35 deletions
diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h
index ecdd246c058..9b42199f91f 100644
--- a/ydb/core/keyvalue/keyvalue_flat_impl.h
+++ b/ydb/core/keyvalue/keyvalue_flat_impl.h
@@ -165,10 +165,12 @@ protected:
struct TTxDropRefCountsOnError : NTabletFlatExecutor::ITransaction {
std::deque<std::pair<TLogoBlobID, bool>> RefCountsIncr;
+ const ui64 RequestUid;
TKeyValueFlat *Self;
- TTxDropRefCountsOnError(std::deque<std::pair<TLogoBlobID, bool>>&& refCountsIncr, TKeyValueFlat *self)
+ TTxDropRefCountsOnError(std::deque<std::pair<TLogoBlobID, bool>>&& refCountsIncr, ui64 requestUid, TKeyValueFlat *self)
: RefCountsIncr(std::move(refCountsIncr))
+ , RequestUid(requestUid)
, Self(self)
{}
@@ -178,7 +180,7 @@ protected:
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << Self->TabletID() << " TTxDropRefCountsOnError Execute");
if (!Self->State.GetIsDamaged()) {
TSimpleDbFlat db(txc.DB);
- Self->State.DropRefCountsOnErrorInTx(std::move(RefCountsIncr), db, ctx);
+ Self->State.DropRefCountsOnErrorInTx(std::move(RefCountsIncr), db, ctx, RequestUid);
}
return true;
}
@@ -385,7 +387,7 @@ protected:
State.OnRequestComplete(event.RequestUid, event.Generation, event.Step, ctx, Info(), event.Status, event.Stat);
State.DropRefCountsOnError(event.RefCountsIncr, true, ctx);
if (!event.RefCountsIncr.empty()) {
- Execute(new TTxDropRefCountsOnError(std::move(event.RefCountsIncr), this), ctx);
+ Execute(new TTxDropRefCountsOnError(std::move(event.RefCountsIncr), event.RequestUid, this), ctx);
}
}
diff --git a/ydb/core/keyvalue/keyvalue_intermediate.h b/ydb/core/keyvalue/keyvalue_intermediate.h
index 5f5b7e20e8f..8df55fcaed7 100644
--- a/ydb/core/keyvalue/keyvalue_intermediate.h
+++ b/ydb/core/keyvalue/keyvalue_intermediate.h
@@ -87,7 +87,7 @@ struct TIntermediate {
};
struct TGetStatus {
NKikimrClient::TKeyValueRequest::EStorageChannel StorageChannel;
- TLogoBlobID LogoBlobId;
+ ui32 GroupId;
NKikimrProto::EReplyStatus Status;
TStorageStatusFlags StatusFlags;
};
diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp
index b39de392d84..aba59e184fb 100644
--- a/ydb/core/keyvalue/keyvalue_state.cpp
+++ b/ydb/core/keyvalue/keyvalue_state.cpp
@@ -754,10 +754,8 @@ TLogoBlobID TKeyValueState::AllocateLogoBlobId(ui32 size, ui32 storageChannelIdx
}
Y_VERIFY(!CollectOperation || THelpers::GenerationStep(id) >
THelpers::TGenerationStep(CollectOperation->Header.GetCollectGeneration(), CollectOperation->Header.GetCollectStep()));
- if (requestUid) {
- ++InFlightForStep[id.Step()];
- ++RequestUidStepToCount[std::make_tuple(requestUid, id.Step())];
- }
+ ++InFlightForStep[id.Step()];
+ ++RequestUidStepToCount[std::make_tuple(requestUid, id.Step())];
return id;
}
@@ -784,7 +782,7 @@ void TKeyValueState::RequestExecute(THolder<TIntermediate> &intermediate, ISimpl
// All reads done
intermediate->Response.SetStatus(NMsgBusProxy::MSTATUS_REJECTED);
intermediate->Response.SetErrorReason(str.Str());
- DropRefCountsOnErrorInTx(std::move(intermediate->RefCountsIncr), db, ctx);
+ DropRefCountsOnErrorInTx(std::move(intermediate->RefCountsIncr), db, ctx, intermediate->RequestUid);
return;
}
}
@@ -816,7 +814,7 @@ void TKeyValueState::RequestExecute(THolder<TIntermediate> &intermediate, ISimpl
// All reads done
intermediate->Response.SetStatus(NMsgBusProxy::MSTATUS_INTERNALERROR);
intermediate->Response.SetErrorReason(str.Str());
- DropRefCountsOnErrorInTx(std::move(intermediate->RefCountsIncr), db, ctx);
+ DropRefCountsOnErrorInTx(std::move(intermediate->RefCountsIncr), db, ctx, intermediate->RequestUid);
return;
}
}
@@ -831,14 +829,20 @@ void TKeyValueState::RequestComplete(THolder<TIntermediate> &intermediate, const
const TTabletStorageInfo *info) {
Reply(intermediate, ctx, info);
+
+ if (const auto it = TrashBeingCommitted.find(intermediate->RequestUid); it != TrashBeingCommitted.end()) {
+ Trash.insert(it->second.begin(), it->second.end());
+ TrashBeingCommitted.erase(it);
+ }
+
// Make sure there is no way for a successfull delete transaction to end without getting here
PrepareCollectIfNeeded(ctx);
}
void TKeyValueState::DropRefCountsOnErrorInTx(std::deque<std::pair<TLogoBlobID, bool>>&& refCountsIncr, ISimpleDb& db,
- const TActorContext& ctx) {
+ const TActorContext& ctx, ui64 requestUid) {
for (const auto& [logoBlobId, initial] : refCountsIncr) {
- Dereference(logoBlobId, db, ctx, initial);
+ Dereference(logoBlobId, db, ctx, initial, requestUid);
}
}
@@ -1025,10 +1029,10 @@ void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request,
NKikimrClient::TKeyValueResponse::TWriteResult *legacyResponse,
NKikimrKeyValue::StorageChannel *response,
ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime,
- TIntermediate* /*intermediate*/)
+ TIntermediate *intermediate)
{
TIndexRecord& record = Index[request.Key];
- Dereference(record, db, ctx);
+ Dereference(record, db, ctx, intermediate->RequestUid);
record.Chain = {};
ui32 storage_channel = 0;
@@ -1077,12 +1081,16 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TDelete &request,
NKikimrClient::TKeyValueResponse::TDeleteRangeResult *legacyResponse,
NKikimrKeyValue::StorageChannel */*response*/,
ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 /*unixTime*/,
- TIntermediate* /*intermediate*/)
+ TIntermediate *intermediate)
{
TraverseRange(request.Range, [&](TIndex::iterator it) {
stat.Deletes++;
stat.DeleteBytes += it->second.GetFullValueSize();
- Dereference(it->second, db, ctx);
+ for (const auto& chain : it->second.Chain) {
+ STLOG(PRI_DEBUG, KEYVALUE_GC, KVC99, "DeleteRange", (TabletId, TabletId), (Generation, ExecutorGeneration),
+ (Key, it->first), (Id, chain.LogoBlobId));
+ }
+ Dereference(it->second, db, ctx, intermediate->RequestUid);
THelpers::DbEraseUserKey(it->first, db, ctx);
Index.erase(it);
});
@@ -1096,14 +1104,14 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TRename &request,
NKikimrClient::TKeyValueResponse::TRenameResult *legacyResponse,
NKikimrKeyValue::StorageChannel */*response*/,
ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime,
- TIntermediate* /*intermediate*/)
+ TIntermediate *intermediate)
{
auto oldIter = Index.find(request.OldKey);
Y_VERIFY(oldIter != Index.end());
TIndexRecord& source = oldIter->second;
TIndexRecord& dest = Index[request.NewKey];
- Dereference(dest, db, ctx);
+ Dereference(dest, db, ctx, intermediate->RequestUid);
dest.Chain = std::move(source.Chain);
dest.CreationUnixTime = unixTime;
@@ -1142,7 +1150,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TCopyRange &request,
TString newKey = request.PrefixToAdd + it->first.substr(request.PrefixToRemove.size());
TIndexRecord& record = Index[newKey];
- Dereference(record, db, ctx);
+ Dereference(record, db, ctx, intermediate->RequestUid);
record.Chain = sourceRecord.Chain;
record.CreationUnixTime = sourceRecord.CreationUnixTime;
UpdateKeyValue(newKey, record, db, ctx);
@@ -1180,14 +1188,14 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TConcat &request,
}
if (!request.KeepInputs) {
- Dereference(input, db, ctx);
+ Dereference(input, db, ctx, intermediate->RequestUid);
THelpers::DbEraseUserKey(it->first, db, ctx);
Index.erase(it);
}
}
TIndexRecord& record = Index[request.OutputKey];
- Dereference(record, db, ctx);
+ Dereference(record, db, ctx, intermediate->RequestUid);
record.Chain = std::move(chain);
record.CreationUnixTime = unixTime;
UpdateKeyValue(request.OutputKey, record, db, ctx);
@@ -1575,7 +1583,7 @@ void TKeyValueState::ProcessCmds(THolder<TIntermediate> &intermediate, ISimpleDb
success = success && CheckCmds(intermediate, ctx, keys, info);
success = success && CheckCmdGetStatus(intermediate, ctx, keys, info);
if (!success) {
- DropRefCountsOnErrorInTx(std::exchange(intermediate->RefCountsIncr, {}), db, ctx);
+ DropRefCountsOnErrorInTx(std::exchange(intermediate->RefCountsIncr, {}), db, ctx, intermediate->RequestUid);
} else {
// Read + validate
CmdRead(intermediate, db, ctx);
@@ -1621,21 +1629,22 @@ bool TKeyValueState::IncrementGeneration(THolder<TIntermediate> &intermediate, I
return true;
}
-void TKeyValueState::Dereference(const TIndexRecord& record, ISimpleDb& db, const TActorContext& ctx) {
+void TKeyValueState::Dereference(const TIndexRecord& record, ISimpleDb& db, const TActorContext& ctx, ui64 requestUid) {
for (const TIndexRecord::TChainItem& item : record.Chain) {
if (!item.IsInline()) {
- Dereference(item.LogoBlobId, db, ctx, false);
+ Dereference(item.LogoBlobId, db, ctx, false, requestUid);
}
}
}
-void TKeyValueState::Dereference(const TLogoBlobID& id, ISimpleDb& db, const TActorContext& ctx, bool initial) {
+void TKeyValueState::Dereference(const TLogoBlobID& id, ISimpleDb& db, const TActorContext& ctx, bool initial,
+ ui64 requestUid) {
auto it = RefCounts.find(id);
Y_VERIFY(it != RefCounts.end());
--it->second;
if (!it->second) {
RefCounts.erase(it);
- Trash.insert(id);
+ TrashBeingCommitted[requestUid].push_back(id);
THelpers::DbUpdateTrash(id, db, ctx);
if (initial) {
CountInitialTrashRecord(id.BlobSize());
@@ -2273,7 +2282,7 @@ TKeyValueState::TPrepareResult TKeyValueState::InitGetStatusCommand(TIntermediat
TString msg;
if (storageChannel == NKikimrClient::TKeyValueRequest::INLINE) {
cmd.StorageChannel = storageChannel;
- cmd.LogoBlobId = TLogoBlobID();
+ cmd.GroupId = 0;
cmd.Status = NKikimrProto::OK;
cmd.StatusFlags = TStorageStatusFlags(ui32(NKikimrBlobStorage::StatusIsValid));
if (GetIsTabletYellowMove()) {
@@ -2294,7 +2303,7 @@ TKeyValueState::TPrepareResult TKeyValueState::InitGetStatusCommand(TIntermediat
}
cmd.StorageChannel = storageChannel;
- cmd.LogoBlobId = AllocateLogoBlobId(1, storageChannelIdx, 0);
+ cmd.GroupId = info->GroupFor(storageChannelIdx, ExecutorGeneration);
cmd.Status = NKikimrProto::UNKNOWN;
}
return {false, msg};
diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h
index 17edd034494..901cd5d19ef 100644
--- a/ydb/core/keyvalue/keyvalue_state.h
+++ b/ydb/core/keyvalue/keyvalue_state.h
@@ -254,6 +254,7 @@ protected:
TIndex Index;
THashMap<TLogoBlobID, ui32> RefCounts;
TSet<TLogoBlobID> Trash;
+ THashMap<ui64, TVector<TLogoBlobID>> TrashBeingCommitted;
TMap<ui64, ui64> InFlightForStep;
TMap<std::tuple<ui64, ui32>, ui32> RequestUidStepToCount;
THashSet<ui64> CmdTrimLeakedBlobsUids;
@@ -336,7 +337,8 @@ public:
void RequestExecute(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx,
const TTabletStorageInfo *info);
void RequestComplete(THolder<TIntermediate> &intermediate, const TActorContext &ctx, const TTabletStorageInfo *info);
- void DropRefCountsOnErrorInTx(std::deque<std::pair<TLogoBlobID, bool>>&& refCountsIncr, ISimpleDb& db, const TActorContext& ctx);
+ void DropRefCountsOnErrorInTx(std::deque<std::pair<TLogoBlobID, bool>>&& refCountsIncr, ISimpleDb& db, const TActorContext& ctx,
+ ui64 requestUid);
void DropRefCountsOnError(std::deque<std::pair<TLogoBlobID, bool>>& refCountsIncr /*in-out*/, bool writesMade,
const TActorContext& ctx);
void ProcessPostponedTrims(const TActorContext& ctx, const TTabletStorageInfo *info);
@@ -431,9 +433,10 @@ public:
return CollectOperation;
}
- void Dereference(const TIndexRecord& record, ISimpleDb& db, const TActorContext& ctx);
+ void Dereference(const TIndexRecord& record, ISimpleDb& db, const TActorContext& ctx, ui64 requestUid);
void UpdateKeyValue(const TString& key, const TIndexRecord& record, ISimpleDb& db, const TActorContext& ctx);
- void Dereference(const TLogoBlobID& id, ISimpleDb& db, const TActorContext& ctx, bool initial);
+ void Dereference(const TLogoBlobID& id, ISimpleDb& db, const TActorContext& ctx, bool initial,
+ ui64 requestUid);
ui32 GetPerGenerationCounter() {
return PerGenerationCounter;
diff --git a/ydb/core/keyvalue/keyvalue_storage_request.cpp b/ydb/core/keyvalue/keyvalue_storage_request.cpp
index 0d780c546ec..84005ead266 100644
--- a/ydb/core/keyvalue/keyvalue_storage_request.cpp
+++ b/ydb/core/keyvalue/keyvalue_storage_request.cpp
@@ -601,11 +601,8 @@ public:
auto &getStatus = IntermediateResults->GetStatuses[i];
if (getStatus.Status != NKikimrProto::OK) {
Y_VERIFY(getStatus.Status == NKikimrProto::UNKNOWN);
- const ui32 groupId = TabletInfo->GroupFor(getStatus.LogoBlobId.Channel(), getStatus.LogoBlobId.Generation());
- Y_VERIFY(groupId != Max<ui32>(), "GetStatus Blob# %s is mapped to an invalid group (-1)!",
- getStatus.LogoBlobId.ToString().c_str());
SendToBSProxy(
- ctx, groupId,
+ ctx, getStatus.GroupId,
new TEvBlobStorage::TEvStatus(IntermediateResults->Deadline), i);
++GetStatusRequestsSent;
}