diff options
author | Alexander Rutkovsky <alexvru@mail.ru> | 2022-02-28 16:10:36 +0300 |
---|---|---|
committer | Alexander Rutkovsky <alexvru@mail.ru> | 2022-02-28 16:10:36 +0300 |
commit | 6e4acf929becf5890e533653404e8b74d60983c3 (patch) | |
tree | 2902496455c81c051acb6e30d36c5f626a36c3b7 | |
parent | a14e31648f51ccc1071d5fc2fc2f04eb09537a9a (diff) | |
download | ydb-6e4acf929becf5890e533653404e8b74d60983c3.tar.gz |
Fix blob leakage KIKIMR-14427
ref:4ebd632b13defc653131012b12c083df26cb9e60
-rw-r--r-- | ydb/core/keyvalue/keyvalue_intermediate.h | 1 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.cpp | 95 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.h | 16 |
3 files changed, 48 insertions, 64 deletions
diff --git a/ydb/core/keyvalue/keyvalue_intermediate.h b/ydb/core/keyvalue/keyvalue_intermediate.h index c28c14b9c3..38698a9c73 100644 --- a/ydb/core/keyvalue/keyvalue_intermediate.h +++ b/ydb/core/keyvalue/keyvalue_intermediate.h @@ -113,6 +113,7 @@ struct TIntermediate { TDeque<TGetStatus> GetStatuses; TMaybe<TTrimLeakedBlobs> TrimLeakedBlobs; TMaybe<TSetExecutorFastLogPolicy> SetExecutorFastLogPolicy; + std::deque<std::pair<TLogoBlobID, bool>> RefCountsIncr; TStackVec<TCmd, 1> Commands; TStackVec<ui32, 1> WriteIndices; diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp index 6b31c463f8..11f1ba55a2 100644 --- a/ydb/core/keyvalue/keyvalue_state.cpp +++ b/ydb/core/keyvalue/keyvalue_state.cpp @@ -894,7 +894,8 @@ void TKeyValueState::Reply(THolder<TIntermediate> &intermediate, const TActorCon void TKeyValueState::ProcessCmd(TIntermediate::TRead &request, NKikimrClient::TKeyValueResponse::TReadResult *legacyResponse, NKikimrKeyValue::Channel */*response*/, - ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/) + ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/, + TIntermediate* /*intermediate*/) { NKikimrProto::EReplyStatus outStatus = request.CumulativeStatus(); request.Status = outStatus; @@ -933,7 +934,8 @@ void TKeyValueState::ProcessCmd(TIntermediate::TRead &request, void TKeyValueState::ProcessCmd(TIntermediate::TRangeRead &request, NKikimrClient::TKeyValueResponse::TReadRangeResult *legacyResponse, NKikimrKeyValue::Channel */*response*/, - ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/) + ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/, + TIntermediate* /*intermediate*/) { for (ui64 r = 0; r < request.Reads.size(); ++r) { auto &read = request.Reads[r]; @@ -1015,7 +1017,8 @@ void SetStatusFlags(NKikimrKeyValue::Flags *flags, const TStorageStatusFlags &st void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request, NKikimrClient::TKeyValueResponse::TWriteResult *legacyResponse, NKikimrKeyValue::Channel *response, - ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime) + ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime, + TIntermediate* /*intermediate*/) { TIndexRecord& record = Index[request.Key]; Dereference(record, db, ctx); @@ -1066,7 +1069,8 @@ void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request, void TKeyValueState::ProcessCmd(const TIntermediate::TDelete &request, NKikimrClient::TKeyValueResponse::TDeleteRangeResult *legacyResponse, NKikimrKeyValue::Channel */*response*/, - ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 /*unixTime*/) + ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 /*unixTime*/, + TIntermediate* /*intermediate*/) { TraverseRange(request.Range, [&](TIndex::iterator it) { stat.Deletes++; @@ -1084,7 +1088,8 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TDelete &request, void TKeyValueState::ProcessCmd(const TIntermediate::TRename &request, NKikimrClient::TKeyValueResponse::TRenameResult *legacyResponse, NKikimrKeyValue::Channel */*response*/, - ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime) + ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime, + TIntermediate* /*intermediate*/) { auto oldIter = Index.find(request.OldKey); Y_VERIFY(oldIter != Index.end()); @@ -1108,7 +1113,8 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TRename &request, void TKeyValueState::ProcessCmd(const TIntermediate::TCopyRange &request, NKikimrClient::TKeyValueResponse::TCopyRangeResult *legacyResponse, NKikimrKeyValue::Channel */*response*/, - ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 /*unixTime*/) + ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 /*unixTime*/, + TIntermediate *intermediate) { TVector<TIndex::iterator> itemsToClone; @@ -1123,6 +1129,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TCopyRange &request, for (const TIndexRecord::TChainItem& item : sourceRecord.Chain) { if (!item.IsInline()) { ++RefCounts[item.LogoBlobId]; + intermediate->RefCountsIncr.emplace_back(item.LogoBlobId, false); } } @@ -1142,7 +1149,8 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TCopyRange &request, void TKeyValueState::ProcessCmd(const TIntermediate::TConcat &request, NKikimrClient::TKeyValueResponse::TConcatResult *legacyResponse, NKikimrKeyValue::Channel */*response*/, - ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime) + ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime, + TIntermediate *intermediate) { TVector<TIndexRecord::TChainItem> chain; ui64 offset = 0; @@ -1159,6 +1167,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TConcat &request, const TLogoBlobID& id = chainItem.LogoBlobId; chain.push_back(TIndexRecord::TChainItem(id, offset)); ++RefCounts[id]; + intermediate->RefCountsIncr.emplace_back(id, false); } offset += chainItem.GetSize(); } @@ -1185,12 +1194,12 @@ void TKeyValueState::CmdRead(THolder<TIntermediate> &intermediate, ISimpleDb &db for (ui64 i = 0; i < intermediate->Reads.size(); ++i) { auto &request = intermediate->Reads[i]; auto *response = intermediate->Response.AddReadResult(); - ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0); + ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0, intermediate.Get()); } if (intermediate->ReadCommand && std::holds_alternative<TIntermediate::TRead>(*intermediate->ReadCommand)) { auto &request = std::get<TIntermediate::TRead>(*intermediate->ReadCommand); auto *response = intermediate->Response.AddReadResult(); - ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0); + ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0, intermediate.Get()); } } @@ -1200,12 +1209,12 @@ void TKeyValueState::CmdReadRange(THolder<TIntermediate> &intermediate, ISimpleD for (ui64 i = 0; i < intermediate->RangeReads.size(); ++i) { auto &rangeRead = intermediate->RangeReads[i]; auto *rangeReadResult = intermediate->Response.AddReadRangeResult(); - ProcessCmd(rangeRead, rangeReadResult, nullptr, db, ctx, intermediate->Stat, 0); + ProcessCmd(rangeRead, rangeReadResult, nullptr, db, ctx, intermediate->Stat, 0, intermediate.Get()); } if (intermediate->ReadCommand && std::holds_alternative<TIntermediate::TRangeRead>(*intermediate->ReadCommand)) { auto &request = std::get<TIntermediate::TRangeRead>(*intermediate->ReadCommand); auto *response = intermediate->Response.AddReadRangeResult(); - ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0); + ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0, intermediate.Get()); } } @@ -1214,7 +1223,7 @@ void TKeyValueState::CmdRename(THolder<TIntermediate> &intermediate, ISimpleDb & for (ui32 i = 0; i < intermediate->Renames.size(); ++i) { auto& request = intermediate->Renames[i]; auto *response = intermediate->Response.AddRenameResult(); - ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, unixTime); + ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, unixTime, intermediate.Get()); } } @@ -1222,7 +1231,7 @@ void TKeyValueState::CmdDelete(THolder<TIntermediate> &intermediate, ISimpleDb & for (ui32 i = 0; i < intermediate->Deletes.size(); ++i) { auto& request = intermediate->Deletes[i]; auto *response = intermediate->Response.AddDeleteRangeResult(); - ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0); + ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0, intermediate.Get()); } } @@ -1231,7 +1240,7 @@ void TKeyValueState::CmdWrite(THolder<TIntermediate> &intermediate, ISimpleDb &d for (ui32 i = 0; i < intermediate->Writes.size(); ++i) { auto& request = intermediate->Writes[i]; auto *response = intermediate->Response.AddWriteResult(); - ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, unixTime); + ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, unixTime, intermediate.Get()); } ResourceMetrics->TryUpdate(ctx); } @@ -1273,7 +1282,7 @@ void TKeyValueState::CmdGetStatus(THolder<TIntermediate> &intermediate, ISimpleD void TKeyValueState::CmdCopyRange(THolder<TIntermediate>& intermediate, ISimpleDb& db, const TActorContext& ctx) { for (const auto& request : intermediate->CopyRanges) { auto *response = intermediate->Response.AddCopyRangeResult(); - ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0); + ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0, intermediate.Get()); } } @@ -1281,7 +1290,7 @@ void TKeyValueState::CmdConcat(THolder<TIntermediate>& intermediate, ISimpleDb& ui64 unixTime = TAppData::TimeProvider->Now().Seconds(); for (const auto& request : intermediate->Concats) { auto *response = intermediate->Response.AddConcatResult(); - ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, unixTime); + ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, unixTime, intermediate.Get()); } } @@ -1366,7 +1375,7 @@ void TKeyValueState::CmdCmds(THolder<TIntermediate> &intermediate, ISimpleDb &db } }; auto process = [&](auto &cmd) { - ProcessCmd(cmd, getLegacyResponse(cmd), getChannel(cmd), db, ctx, intermediate->Stat, unixTime); + ProcessCmd(cmd, getLegacyResponse(cmd), getChannel(cmd), db, ctx, intermediate->Stat, unixTime, intermediate.Get()); }; for (auto &cmd : intermediate->Commands) { std::visit(process, cmd); @@ -1559,19 +1568,8 @@ void TKeyValueState::ProcessCmds(THolder<TIntermediate> &intermediate, ISimpleDb success = success && CheckCmds(intermediate, ctx, keys, info); success = success && CheckCmdGetStatus(intermediate, ctx, keys, info); if (!success) { - for (const auto& cmd : intermediate->Writes) { - for (const TLogoBlobID& logoBlobId : cmd.LogoBlobIds) { - Dereference(logoBlobId, db, ctx, true); - } - } - for (const auto& cmd : intermediate->Commands) { - if (!std::holds_alternative<TIntermediate::TWrite>(cmd)) { - continue; - } - auto& write = std::get<TIntermediate::TWrite>(cmd); - for (const TLogoBlobID& logoBlobId : write.LogoBlobIds) { - Dereference(logoBlobId, db, ctx, true); - } + for (const auto& [logoBlobId, initial] : std::exchange(intermediate->RefCountsIncr, {})) { + Dereference(logoBlobId, db, ctx, initial); } } else { // Read + validate @@ -2141,7 +2139,8 @@ bool TKeyValueState::PrepareCmdDelete(const TActorContext &ctx, NKikimrClient::T return false; } -void TKeyValueState::SplitIntoBlobs(TIntermediate::TWrite &cmd, bool isInline, ui32 storageChannelIdx) { +void TKeyValueState::SplitIntoBlobs(TIntermediate::TWrite &cmd, bool isInline, ui32 storageChannelIdx, + TIntermediate *intermediate) { if (isInline) { cmd.Status = NKikimrProto::SCHEDULED; cmd.StatusFlags = TStorageStatusFlags(ui32(NKikimrBlobStorage::StatusIsValid)); @@ -2162,6 +2161,7 @@ void TKeyValueState::SplitIntoBlobs(TIntermediate::TWrite &cmd, bool isInline, u for (const TLogoBlobID& logoBlobId : cmd.LogoBlobIds) { ui32 newRefCount = ++RefCounts[logoBlobId]; Y_VERIFY(newRefCount == 1); + intermediate->RefCountsIncr.emplace_back(logoBlobId, true); } } } @@ -2249,7 +2249,7 @@ bool TKeyValueState::PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TK break; } } - SplitIntoBlobs(interm, isInline, storageChannelIdx); + SplitIntoBlobs(interm, isInline, storageChannelIdx, intermediate.Get()); } return false; } @@ -2463,7 +2463,7 @@ TPrepareResult TKeyValueState::PrepareOneCmd(const TCommand::Write &request, THo storageChannelIdx = BLOB_CHANNEL; } } - SplitIntoBlobs(cmd, isInline, storageChannelIdx); + SplitIntoBlobs(cmd, isInline, storageChannelIdx, intermediate.Get()); return {}; } @@ -2712,14 +2712,9 @@ bool TKeyValueState::PrepareExecuteTransactionRequest(const TActorContext &ctx, TPrepareResult result = PrepareCommands(request, intermediate, info, ctx); if (result.WithError) { - // discard allocated LogoBlobIds from the Write commands - for (ui32 idx : intermediate->WriteIndices) { - const auto &cmd = intermediate->Commands[idx]; - Y_VERIFY(std::holds_alternative<TIntermediate::TWrite>(cmd)); - auto &write = std::get<TIntermediate::TWrite>(cmd); - for (const TLogoBlobID &id : write.LogoBlobIds) { - const ui32 count = RefCounts.erase(id); - Y_VERIFY(count == 1); + for (const auto& [logoBlobId, initial] : std::exchange(intermediate->RefCountsIncr, {})) { + if (const auto it = RefCounts.find(logoBlobId); it != RefCounts.end() && !--it->second) { + RefCounts.erase(it); } } @@ -3101,21 +3096,9 @@ bool TKeyValueState::PrepareIntermediate(TEvKeyValue::TEvRequest::TPtr &ev, THol intermediate->ConcatCount = request.CmdConcatSize(); if (error) { - // discard allocated LogoBlobIds from the Write commands - for (const auto &write : intermediate->Writes) { - for (const TLogoBlobID &id : write.LogoBlobIds) { - const ui32 count = RefCounts.erase(id); - Y_VERIFY(count == 1); - } - } - - for (ui32 idx : intermediate->WriteIndices) { - const auto &cmd = intermediate->Commands[idx]; - Y_VERIFY(std::holds_alternative<TIntermediate::TWrite>(cmd)); - auto &write = std::get<TIntermediate::TWrite>(cmd); - for (const TLogoBlobID &id : write.LogoBlobIds) { - const ui32 count = RefCounts.erase(id); - Y_VERIFY(count == 1); + for (const auto& [logoBlobId, initial] : std::exchange(intermediate->RefCountsIncr, {})) { + if (const auto it = RefCounts.find(logoBlobId); it != RefCounts.end() && !--it->second) { + RefCounts.erase(it); } } diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h index f57a69d35d..312a2115cf 100644 --- a/ydb/core/keyvalue/keyvalue_state.h +++ b/ydb/core/keyvalue/keyvalue_state.h @@ -337,31 +337,31 @@ public: void ProcessCmd(TIntermediate::TRead &read, NKikimrClient::TKeyValueResponse::TReadResult *legacyResponse, NKikimrKeyValue::Channel *response, - ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime); + ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate); void ProcessCmd(TIntermediate::TRangeRead &request, NKikimrClient::TKeyValueResponse::TReadRangeResult *legacyResponse, NKikimrKeyValue::Channel *response, - ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime); + ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate); void ProcessCmd(TIntermediate::TWrite &request, NKikimrClient::TKeyValueResponse::TWriteResult *legacyResponse, NKikimrKeyValue::Channel *response, - ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime); + ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate); void ProcessCmd(const TIntermediate::TDelete &request, NKikimrClient::TKeyValueResponse::TDeleteRangeResult *legacyResponse, NKikimrKeyValue::Channel *response, - ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime); + ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate); void ProcessCmd(const TIntermediate::TRename &request, NKikimrClient::TKeyValueResponse::TRenameResult *legacyResponse, NKikimrKeyValue::Channel *response, - ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime); + ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate); void ProcessCmd(const TIntermediate::TCopyRange &request, NKikimrClient::TKeyValueResponse::TCopyRangeResult *legacyResponse, NKikimrKeyValue::Channel *response, - ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime); + ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate); void ProcessCmd(const TIntermediate::TConcat &request, NKikimrClient::TKeyValueResponse::TConcatResult *resplegacyResponseonse, NKikimrKeyValue::Channel *response, - ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime); + ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate); void CmdRead(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx); void CmdReadRange(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx); void CmdRename(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx); @@ -492,7 +492,7 @@ public: return false; } - void SplitIntoBlobs(TIntermediate::TWrite &cmd, bool isInline, ui32 storageChannelIdx); + void SplitIntoBlobs(TIntermediate::TWrite &cmd, bool isInline, ui32 storageChannelIdx, TIntermediate *intermediate); bool PrepareCmdRead(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest, THolder<TIntermediate> &intermediate, bool &outIsInlineOnly); |