aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@mail.ru>2022-02-28 16:10:36 +0300
committerAlexander Rutkovsky <alexvru@mail.ru>2022-02-28 16:10:36 +0300
commit6e4acf929becf5890e533653404e8b74d60983c3 (patch)
tree2902496455c81c051acb6e30d36c5f626a36c3b7
parenta14e31648f51ccc1071d5fc2fc2f04eb09537a9a (diff)
downloadydb-6e4acf929becf5890e533653404e8b74d60983c3.tar.gz
Fix blob leakage KIKIMR-14427
ref:4ebd632b13defc653131012b12c083df26cb9e60
-rw-r--r--ydb/core/keyvalue/keyvalue_intermediate.h1
-rw-r--r--ydb/core/keyvalue/keyvalue_state.cpp95
-rw-r--r--ydb/core/keyvalue/keyvalue_state.h16
3 files changed, 48 insertions, 64 deletions
diff --git a/ydb/core/keyvalue/keyvalue_intermediate.h b/ydb/core/keyvalue/keyvalue_intermediate.h
index c28c14b9c3..38698a9c73 100644
--- a/ydb/core/keyvalue/keyvalue_intermediate.h
+++ b/ydb/core/keyvalue/keyvalue_intermediate.h
@@ -113,6 +113,7 @@ struct TIntermediate {
TDeque<TGetStatus> GetStatuses;
TMaybe<TTrimLeakedBlobs> TrimLeakedBlobs;
TMaybe<TSetExecutorFastLogPolicy> SetExecutorFastLogPolicy;
+ std::deque<std::pair<TLogoBlobID, bool>> RefCountsIncr;
TStackVec<TCmd, 1> Commands;
TStackVec<ui32, 1> WriteIndices;
diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp
index 6b31c463f8..11f1ba55a2 100644
--- a/ydb/core/keyvalue/keyvalue_state.cpp
+++ b/ydb/core/keyvalue/keyvalue_state.cpp
@@ -894,7 +894,8 @@ void TKeyValueState::Reply(THolder<TIntermediate> &intermediate, const TActorCon
void TKeyValueState::ProcessCmd(TIntermediate::TRead &request,
NKikimrClient::TKeyValueResponse::TReadResult *legacyResponse,
NKikimrKeyValue::Channel */*response*/,
- ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/)
+ ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/,
+ TIntermediate* /*intermediate*/)
{
NKikimrProto::EReplyStatus outStatus = request.CumulativeStatus();
request.Status = outStatus;
@@ -933,7 +934,8 @@ void TKeyValueState::ProcessCmd(TIntermediate::TRead &request,
void TKeyValueState::ProcessCmd(TIntermediate::TRangeRead &request,
NKikimrClient::TKeyValueResponse::TReadRangeResult *legacyResponse,
NKikimrKeyValue::Channel */*response*/,
- ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/)
+ ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/,
+ TIntermediate* /*intermediate*/)
{
for (ui64 r = 0; r < request.Reads.size(); ++r) {
auto &read = request.Reads[r];
@@ -1015,7 +1017,8 @@ void SetStatusFlags(NKikimrKeyValue::Flags *flags, const TStorageStatusFlags &st
void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request,
NKikimrClient::TKeyValueResponse::TWriteResult *legacyResponse,
NKikimrKeyValue::Channel *response,
- ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime)
+ ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime,
+ TIntermediate* /*intermediate*/)
{
TIndexRecord& record = Index[request.Key];
Dereference(record, db, ctx);
@@ -1066,7 +1069,8 @@ void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request,
void TKeyValueState::ProcessCmd(const TIntermediate::TDelete &request,
NKikimrClient::TKeyValueResponse::TDeleteRangeResult *legacyResponse,
NKikimrKeyValue::Channel */*response*/,
- ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 /*unixTime*/)
+ ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 /*unixTime*/,
+ TIntermediate* /*intermediate*/)
{
TraverseRange(request.Range, [&](TIndex::iterator it) {
stat.Deletes++;
@@ -1084,7 +1088,8 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TDelete &request,
void TKeyValueState::ProcessCmd(const TIntermediate::TRename &request,
NKikimrClient::TKeyValueResponse::TRenameResult *legacyResponse,
NKikimrKeyValue::Channel */*response*/,
- ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime)
+ ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime,
+ TIntermediate* /*intermediate*/)
{
auto oldIter = Index.find(request.OldKey);
Y_VERIFY(oldIter != Index.end());
@@ -1108,7 +1113,8 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TRename &request,
void TKeyValueState::ProcessCmd(const TIntermediate::TCopyRange &request,
NKikimrClient::TKeyValueResponse::TCopyRangeResult *legacyResponse,
NKikimrKeyValue::Channel */*response*/,
- ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 /*unixTime*/)
+ ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 /*unixTime*/,
+ TIntermediate *intermediate)
{
TVector<TIndex::iterator> itemsToClone;
@@ -1123,6 +1129,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TCopyRange &request,
for (const TIndexRecord::TChainItem& item : sourceRecord.Chain) {
if (!item.IsInline()) {
++RefCounts[item.LogoBlobId];
+ intermediate->RefCountsIncr.emplace_back(item.LogoBlobId, false);
}
}
@@ -1142,7 +1149,8 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TCopyRange &request,
void TKeyValueState::ProcessCmd(const TIntermediate::TConcat &request,
NKikimrClient::TKeyValueResponse::TConcatResult *legacyResponse,
NKikimrKeyValue::Channel */*response*/,
- ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime)
+ ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime,
+ TIntermediate *intermediate)
{
TVector<TIndexRecord::TChainItem> chain;
ui64 offset = 0;
@@ -1159,6 +1167,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TConcat &request,
const TLogoBlobID& id = chainItem.LogoBlobId;
chain.push_back(TIndexRecord::TChainItem(id, offset));
++RefCounts[id];
+ intermediate->RefCountsIncr.emplace_back(id, false);
}
offset += chainItem.GetSize();
}
@@ -1185,12 +1194,12 @@ void TKeyValueState::CmdRead(THolder<TIntermediate> &intermediate, ISimpleDb &db
for (ui64 i = 0; i < intermediate->Reads.size(); ++i) {
auto &request = intermediate->Reads[i];
auto *response = intermediate->Response.AddReadResult();
- ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0);
+ ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0, intermediate.Get());
}
if (intermediate->ReadCommand && std::holds_alternative<TIntermediate::TRead>(*intermediate->ReadCommand)) {
auto &request = std::get<TIntermediate::TRead>(*intermediate->ReadCommand);
auto *response = intermediate->Response.AddReadResult();
- ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0);
+ ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0, intermediate.Get());
}
}
@@ -1200,12 +1209,12 @@ void TKeyValueState::CmdReadRange(THolder<TIntermediate> &intermediate, ISimpleD
for (ui64 i = 0; i < intermediate->RangeReads.size(); ++i) {
auto &rangeRead = intermediate->RangeReads[i];
auto *rangeReadResult = intermediate->Response.AddReadRangeResult();
- ProcessCmd(rangeRead, rangeReadResult, nullptr, db, ctx, intermediate->Stat, 0);
+ ProcessCmd(rangeRead, rangeReadResult, nullptr, db, ctx, intermediate->Stat, 0, intermediate.Get());
}
if (intermediate->ReadCommand && std::holds_alternative<TIntermediate::TRangeRead>(*intermediate->ReadCommand)) {
auto &request = std::get<TIntermediate::TRangeRead>(*intermediate->ReadCommand);
auto *response = intermediate->Response.AddReadRangeResult();
- ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0);
+ ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0, intermediate.Get());
}
}
@@ -1214,7 +1223,7 @@ void TKeyValueState::CmdRename(THolder<TIntermediate> &intermediate, ISimpleDb &
for (ui32 i = 0; i < intermediate->Renames.size(); ++i) {
auto& request = intermediate->Renames[i];
auto *response = intermediate->Response.AddRenameResult();
- ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, unixTime);
+ ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, unixTime, intermediate.Get());
}
}
@@ -1222,7 +1231,7 @@ void TKeyValueState::CmdDelete(THolder<TIntermediate> &intermediate, ISimpleDb &
for (ui32 i = 0; i < intermediate->Deletes.size(); ++i) {
auto& request = intermediate->Deletes[i];
auto *response = intermediate->Response.AddDeleteRangeResult();
- ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0);
+ ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0, intermediate.Get());
}
}
@@ -1231,7 +1240,7 @@ void TKeyValueState::CmdWrite(THolder<TIntermediate> &intermediate, ISimpleDb &d
for (ui32 i = 0; i < intermediate->Writes.size(); ++i) {
auto& request = intermediate->Writes[i];
auto *response = intermediate->Response.AddWriteResult();
- ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, unixTime);
+ ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, unixTime, intermediate.Get());
}
ResourceMetrics->TryUpdate(ctx);
}
@@ -1273,7 +1282,7 @@ void TKeyValueState::CmdGetStatus(THolder<TIntermediate> &intermediate, ISimpleD
void TKeyValueState::CmdCopyRange(THolder<TIntermediate>& intermediate, ISimpleDb& db, const TActorContext& ctx) {
for (const auto& request : intermediate->CopyRanges) {
auto *response = intermediate->Response.AddCopyRangeResult();
- ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0);
+ ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, 0, intermediate.Get());
}
}
@@ -1281,7 +1290,7 @@ void TKeyValueState::CmdConcat(THolder<TIntermediate>& intermediate, ISimpleDb&
ui64 unixTime = TAppData::TimeProvider->Now().Seconds();
for (const auto& request : intermediate->Concats) {
auto *response = intermediate->Response.AddConcatResult();
- ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, unixTime);
+ ProcessCmd(request, response, nullptr, db, ctx, intermediate->Stat, unixTime, intermediate.Get());
}
}
@@ -1366,7 +1375,7 @@ void TKeyValueState::CmdCmds(THolder<TIntermediate> &intermediate, ISimpleDb &db
}
};
auto process = [&](auto &cmd) {
- ProcessCmd(cmd, getLegacyResponse(cmd), getChannel(cmd), db, ctx, intermediate->Stat, unixTime);
+ ProcessCmd(cmd, getLegacyResponse(cmd), getChannel(cmd), db, ctx, intermediate->Stat, unixTime, intermediate.Get());
};
for (auto &cmd : intermediate->Commands) {
std::visit(process, cmd);
@@ -1559,19 +1568,8 @@ void TKeyValueState::ProcessCmds(THolder<TIntermediate> &intermediate, ISimpleDb
success = success && CheckCmds(intermediate, ctx, keys, info);
success = success && CheckCmdGetStatus(intermediate, ctx, keys, info);
if (!success) {
- for (const auto& cmd : intermediate->Writes) {
- for (const TLogoBlobID& logoBlobId : cmd.LogoBlobIds) {
- Dereference(logoBlobId, db, ctx, true);
- }
- }
- for (const auto& cmd : intermediate->Commands) {
- if (!std::holds_alternative<TIntermediate::TWrite>(cmd)) {
- continue;
- }
- auto& write = std::get<TIntermediate::TWrite>(cmd);
- for (const TLogoBlobID& logoBlobId : write.LogoBlobIds) {
- Dereference(logoBlobId, db, ctx, true);
- }
+ for (const auto& [logoBlobId, initial] : std::exchange(intermediate->RefCountsIncr, {})) {
+ Dereference(logoBlobId, db, ctx, initial);
}
} else {
// Read + validate
@@ -2141,7 +2139,8 @@ bool TKeyValueState::PrepareCmdDelete(const TActorContext &ctx, NKikimrClient::T
return false;
}
-void TKeyValueState::SplitIntoBlobs(TIntermediate::TWrite &cmd, bool isInline, ui32 storageChannelIdx) {
+void TKeyValueState::SplitIntoBlobs(TIntermediate::TWrite &cmd, bool isInline, ui32 storageChannelIdx,
+ TIntermediate *intermediate) {
if (isInline) {
cmd.Status = NKikimrProto::SCHEDULED;
cmd.StatusFlags = TStorageStatusFlags(ui32(NKikimrBlobStorage::StatusIsValid));
@@ -2162,6 +2161,7 @@ void TKeyValueState::SplitIntoBlobs(TIntermediate::TWrite &cmd, bool isInline, u
for (const TLogoBlobID& logoBlobId : cmd.LogoBlobIds) {
ui32 newRefCount = ++RefCounts[logoBlobId];
Y_VERIFY(newRefCount == 1);
+ intermediate->RefCountsIncr.emplace_back(logoBlobId, true);
}
}
}
@@ -2249,7 +2249,7 @@ bool TKeyValueState::PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TK
break;
}
}
- SplitIntoBlobs(interm, isInline, storageChannelIdx);
+ SplitIntoBlobs(interm, isInline, storageChannelIdx, intermediate.Get());
}
return false;
}
@@ -2463,7 +2463,7 @@ TPrepareResult TKeyValueState::PrepareOneCmd(const TCommand::Write &request, THo
storageChannelIdx = BLOB_CHANNEL;
}
}
- SplitIntoBlobs(cmd, isInline, storageChannelIdx);
+ SplitIntoBlobs(cmd, isInline, storageChannelIdx, intermediate.Get());
return {};
}
@@ -2712,14 +2712,9 @@ bool TKeyValueState::PrepareExecuteTransactionRequest(const TActorContext &ctx,
TPrepareResult result = PrepareCommands(request, intermediate, info, ctx);
if (result.WithError) {
- // discard allocated LogoBlobIds from the Write commands
- for (ui32 idx : intermediate->WriteIndices) {
- const auto &cmd = intermediate->Commands[idx];
- Y_VERIFY(std::holds_alternative<TIntermediate::TWrite>(cmd));
- auto &write = std::get<TIntermediate::TWrite>(cmd);
- for (const TLogoBlobID &id : write.LogoBlobIds) {
- const ui32 count = RefCounts.erase(id);
- Y_VERIFY(count == 1);
+ for (const auto& [logoBlobId, initial] : std::exchange(intermediate->RefCountsIncr, {})) {
+ if (const auto it = RefCounts.find(logoBlobId); it != RefCounts.end() && !--it->second) {
+ RefCounts.erase(it);
}
}
@@ -3101,21 +3096,9 @@ bool TKeyValueState::PrepareIntermediate(TEvKeyValue::TEvRequest::TPtr &ev, THol
intermediate->ConcatCount = request.CmdConcatSize();
if (error) {
- // discard allocated LogoBlobIds from the Write commands
- for (const auto &write : intermediate->Writes) {
- for (const TLogoBlobID &id : write.LogoBlobIds) {
- const ui32 count = RefCounts.erase(id);
- Y_VERIFY(count == 1);
- }
- }
-
- for (ui32 idx : intermediate->WriteIndices) {
- const auto &cmd = intermediate->Commands[idx];
- Y_VERIFY(std::holds_alternative<TIntermediate::TWrite>(cmd));
- auto &write = std::get<TIntermediate::TWrite>(cmd);
- for (const TLogoBlobID &id : write.LogoBlobIds) {
- const ui32 count = RefCounts.erase(id);
- Y_VERIFY(count == 1);
+ for (const auto& [logoBlobId, initial] : std::exchange(intermediate->RefCountsIncr, {})) {
+ if (const auto it = RefCounts.find(logoBlobId); it != RefCounts.end() && !--it->second) {
+ RefCounts.erase(it);
}
}
diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h
index f57a69d35d..312a2115cf 100644
--- a/ydb/core/keyvalue/keyvalue_state.h
+++ b/ydb/core/keyvalue/keyvalue_state.h
@@ -337,31 +337,31 @@ public:
void ProcessCmd(TIntermediate::TRead &read,
NKikimrClient::TKeyValueResponse::TReadResult *legacyResponse,
NKikimrKeyValue::Channel *response,
- ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime);
+ ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate);
void ProcessCmd(TIntermediate::TRangeRead &request,
NKikimrClient::TKeyValueResponse::TReadRangeResult *legacyResponse,
NKikimrKeyValue::Channel *response,
- ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime);
+ ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate);
void ProcessCmd(TIntermediate::TWrite &request,
NKikimrClient::TKeyValueResponse::TWriteResult *legacyResponse,
NKikimrKeyValue::Channel *response,
- ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime);
+ ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate);
void ProcessCmd(const TIntermediate::TDelete &request,
NKikimrClient::TKeyValueResponse::TDeleteRangeResult *legacyResponse,
NKikimrKeyValue::Channel *response,
- ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime);
+ ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate);
void ProcessCmd(const TIntermediate::TRename &request,
NKikimrClient::TKeyValueResponse::TRenameResult *legacyResponse,
NKikimrKeyValue::Channel *response,
- ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime);
+ ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate);
void ProcessCmd(const TIntermediate::TCopyRange &request,
NKikimrClient::TKeyValueResponse::TCopyRangeResult *legacyResponse,
NKikimrKeyValue::Channel *response,
- ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime);
+ ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate);
void ProcessCmd(const TIntermediate::TConcat &request,
NKikimrClient::TKeyValueResponse::TConcatResult *resplegacyResponseonse,
NKikimrKeyValue::Channel *response,
- ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime);
+ ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate);
void CmdRead(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx);
void CmdReadRange(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx);
void CmdRename(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx);
@@ -492,7 +492,7 @@ public:
return false;
}
- void SplitIntoBlobs(TIntermediate::TWrite &cmd, bool isInline, ui32 storageChannelIdx);
+ void SplitIntoBlobs(TIntermediate::TWrite &cmd, bool isInline, ui32 storageChannelIdx, TIntermediate *intermediate);
bool PrepareCmdRead(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest,
THolder<TIntermediate> &intermediate, bool &outIsInlineOnly);