diff options
author | alexvru <alexvru@ydb.tech> | 2023-07-26 16:29:35 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-07-26 16:29:35 +0300 |
commit | 1f6b57071583f89299bb5abd3863d594f23c5be5 (patch) | |
tree | 2b29cf80624551b5c89a45fab5cce399ad00beeb | |
parent | 66ff7742308d5402fb5e43c059763d67486a97d3 (diff) | |
download | ydb-1f6b57071583f89299bb5abd3863d594f23c5be5.tar.gz |
Zero-copy in KV KIKIMR-18849
-rw-r--r-- | ydb/core/keyvalue/keyvalue_index_record.cpp | 51 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_index_record.h | 4 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_intermediate.cpp | 10 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_intermediate.h | 9 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.cpp | 60 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.h | 2 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_storage_read_request.cpp | 54 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_storage_request.cpp | 17 | ||||
-rw-r--r-- | ydb/core/protos/msgbus_kv.proto | 16 |
9 files changed, 133 insertions, 90 deletions
diff --git a/ydb/core/keyvalue/keyvalue_index_record.cpp b/ydb/core/keyvalue/keyvalue_index_record.cpp index 35b4f8c1e0..120fe31fdf 100644 --- a/ydb/core/keyvalue/keyvalue_index_record.cpp +++ b/ydb/core/keyvalue/keyvalue_index_record.cpp @@ -11,7 +11,7 @@ TIndexRecord::TChainItem::TChainItem(const TLogoBlobID &id, ui64 offset) { } -TIndexRecord::TChainItem::TChainItem(TRcBuf&& inlineData, ui64 offset) +TIndexRecord::TChainItem::TChainItem(TRope&& inlineData, ui64 offset) : InlineData(std::move(inlineData)) , Offset(offset) { @@ -64,16 +64,12 @@ ui32 TIndexRecord::GetReadItems(ui64 offset, ui64 size, TIntermediate::TRead& re Y_VERIFY(it != Chain.end()); ui32 readSize = Min<ui64>(size, it->GetSize() - offset); if (it->IsInline()) { - if (read.Value.size() != read.ValueSize) { - read.Value.resize(read.ValueSize); - } - Y_VERIFY(it->InlineData.size() >= readSize + offset, "size# %" PRIu64 " read# %" PRIu64 " offset# %" PRIu64, - (ui64)it->InlineData.size(), (ui64)readSize, (ui64)offset); - Y_VERIFY(read.ValueSize >= readSize + valueOffset); - memcpy(const_cast<char *>(read.Value.data()) + valueOffset, it->InlineData.data() + offset, readSize); + const auto& rope = it->InlineData; + const auto begin = rope.Position(offset); + const auto end = begin + readSize; + read.Value.Write(valueOffset, TRope(begin, end)); } else { - read.ReadItems.push_back(TIntermediate::TRead::TReadItem( - it->LogoBlobId, static_cast<ui32>(offset), readSize, valueOffset)); + read.ReadItems.emplace_back(it->LogoBlobId, static_cast<ui32>(offset), readSize, valueOffset); } size -= readSize; offset = 0; @@ -105,17 +101,18 @@ TString TIndexRecord::Serialize() const { data->CreationUnixTime = CreationUnixTime; ui64 offset = 0; - for (ui32 i = 0; i < Chain.size(); ++i) { - if (Chain[i].IsInline()) { + for (const auto& item : Chain) { + if (item.IsInline()) { memset(data->Serialized + offset, 0, sizeof(ui64)); offset += sizeof(ui64); - ui32 size = Chain[i].InlineData.size(); + ui32 size = item.InlineData.size(); memcpy(data->Serialized + offset, &size, sizeof(ui32)); offset += sizeof(ui32); - memcpy(data->Serialized + offset, Chain[i].InlineData.data(), Chain[i].InlineData.size()); - offset += Chain[i].InlineData.size(); + auto& rope = item.InlineData; + rope.begin().ExtractPlainDataAndAdvance(data->Serialized + offset, rope.size()); + offset += rope.size(); } else { - memcpy(data->Serialized + offset, &Chain[i].LogoBlobId, sizeof(TLogoBlobID)); + memcpy(data->Serialized + offset, &item.LogoBlobId, sizeof(TLogoBlobID)); offset += sizeof(TLogoBlobID); } } @@ -169,15 +166,17 @@ bool TIndexRecord::Deserialize1(const TString &rawData, TString &outErrorInfo) { bool TIndexRecord::Deserialize2(const TString &rawData, TString &outErrorInfo) { Y_VERIFY(rawData.size() >= sizeof(TKeyValueData2)); - const TKeyValueData2 *data = (const TKeyValueData2 *)rawData.data(); - if (!data->CheckChecksum(rawData.size())) { + TRcBuf rawDataBuffer(rawData); // encode TString into TRcBuf to slice it further + const TContiguousSpan rawDataSpan = rawDataBuffer.GetContiguousSpan(); + const TKeyValueData2 *data = reinterpret_cast<const TKeyValueData2*>(rawDataSpan.data()); + if (!data->CheckChecksum(rawDataSpan.size())) { TStringStream str; - str << " data->CheckChecksum(rawData.size)# ERROR "; + str << " data->CheckChecksum(rawDataSpan.size)# ERROR "; str << " CreationUnixTime# " << data->CreationUnixTime; - str << " rawData.size# " << rawData.size(); + str << " rawDataSpan.size# " << rawDataSpan.size(); str << " data# "; - for (ui32 i = 0; i < rawData.size(); ++i) { - ui8 d = ((const ui8*)rawData.data())[i]; + for (ui32 i = 0; i < rawDataSpan.size(); ++i) { + ui8 d = ((const ui8*)rawDataSpan.data())[i]; str << Sprintf("%02x", (ui32)d); } outErrorInfo = str.Str(); @@ -186,7 +185,7 @@ bool TIndexRecord::Deserialize2(const TString &rawData, TString &outErrorInfo) { Y_VERIFY(data->DataHeader.ItemType == EIT_KEYVALUE_2); CreationUnixTime = data->CreationUnixTime; ui64 chainOffset = 0; - ui64 endOffset = rawData.size() - sizeof(TKeyValueData2); + ui64 endOffset = rawDataSpan.size() - sizeof(TKeyValueData2); ui64 offset = 0; while (offset < endOffset) { if (endOffset - offset < sizeof(ui64)) { @@ -218,8 +217,10 @@ bool TIndexRecord::Deserialize2(const TString &rawData, TString &outErrorInfo) { outErrorInfo = " Deserialization error# DEA4"; return false; } - TRcBuf inlineData = TRcBuf::Uninitialized(size); - memcpy(inlineData.GetDataMut(), data->Serialized + offset, size); + TRope inlineData; + if (size) { + inlineData = TRcBuf(TRcBuf::Piece, data->Serialized + offset, size, rawDataBuffer); + } offset += size; Chain.push_back(TIndexRecord::TChainItem(std::move(inlineData), chainOffset)); chainOffset += size; diff --git a/ydb/core/keyvalue/keyvalue_index_record.h b/ydb/core/keyvalue/keyvalue_index_record.h index f4797861ab..80ca38a42b 100644 --- a/ydb/core/keyvalue/keyvalue_index_record.h +++ b/ydb/core/keyvalue/keyvalue_index_record.h @@ -10,11 +10,11 @@ namespace NKeyValue { struct TIndexRecord { struct TChainItem { TLogoBlobID LogoBlobId; - TRcBuf InlineData; + TRope InlineData; ui64 Offset; TChainItem(const TLogoBlobID &id, ui64 offset); - TChainItem(TRcBuf&& inlineData, ui64 offset); + TChainItem(TRope&& inlineData, ui64 offset); bool IsInline() const; ui64 GetSize() const; diff --git a/ydb/core/keyvalue/keyvalue_intermediate.cpp b/ydb/core/keyvalue/keyvalue_intermediate.cpp index 82a78d9fa2..16e6cb06eb 100644 --- a/ydb/core/keyvalue/keyvalue_intermediate.cpp +++ b/ydb/core/keyvalue/keyvalue_intermediate.cpp @@ -51,6 +51,12 @@ NKikimrProto::EReplyStatus TIntermediate::TRead::CumulativeStatus() const { } } +TRope TIntermediate::TRead::BuildRope() { + TRope rope = Value ? Value.GetMonolith() : TRope(); + Y_VERIFY(!Value || rope.size() == ValueSize); + return rope; +} + TIntermediate::TIntermediate(TActorId respondTo, TActorId keyValueActorId, ui64 channelGeneration, ui64 channelStep, TRequestType::EType requestType) : Cookie(0) @@ -81,7 +87,7 @@ void TIntermediate::UpdateStat() { Stat.ReadNodata++; } else if (read.Status == NKikimrProto::OK) { Stat.Reads++; - Stat.ReadBytes += read.Value.size(); + Stat.ReadBytes += read.ValueSize; } }; auto checkRangeRead = [&] (const auto &range) { @@ -91,7 +97,7 @@ void TIntermediate::UpdateStat() { Stat.RangeReadItemsNodata++; } else if (read.Status == NKikimrProto::OK) { Stat.RangeReadItems++; - Stat.RangeReadBytes += read.Value.size(); + Stat.RangeReadBytes += read.ValueSize; } } } else { diff --git a/ydb/core/keyvalue/keyvalue_intermediate.h b/ydb/core/keyvalue/keyvalue_intermediate.h index 8df55fcaed..3812a8e7fa 100644 --- a/ydb/core/keyvalue/keyvalue_intermediate.h +++ b/ydb/core/keyvalue/keyvalue_intermediate.h @@ -7,6 +7,7 @@ #include <ydb/core/protos/base.pb.h> #include <ydb/core/protos/msgbus_kv.pb.h> #include <ydb/core/protos/msgbus.pb.h> +#include <ydb/core/util/fragmented_buffer.h> #include <ydb/core/keyvalue/protos/events.pb.h> namespace NKikimr { @@ -34,7 +35,7 @@ struct TIntermediate { TVector<TReadItem> ReadItems; TString Key; - TString Value; + TFragmentedBuffer Value; ui32 Offset; ui32 Size; ui32 ValueSize; @@ -50,6 +51,7 @@ struct TIntermediate { NKikimrClient::TKeyValueRequest::EStorageChannel storageChannel); NKikimrProto::EReplyStatus ItemsStatus() const; NKikimrProto::EReplyStatus CumulativeStatus() const; + TRope BuildRope(); }; struct TRangeRead { TDeque<TRead> Reads; @@ -61,7 +63,7 @@ struct TIntermediate { struct TWrite { TVector<TLogoBlobID> LogoBlobIds; TString Key; - TRcBuf Data; + TRope Data; TEvBlobStorage::TEvPut::ETactic Tactic; NKikimrBlobStorage::EPutHandleClass HandleClass; NKikimrProto::EReplyStatus Status; @@ -148,9 +150,12 @@ struct TIntermediate { bool IsReplied; + bool UsePayloadInResponse = false; + TRequestStat Stat; NKikimrClient::TResponse Response; + std::vector<TRope> Payload; NKikimrKeyValue::ExecuteTransactionResult ExecuteTransactionResponse; NKikimrKeyValue::GetStorageChannelStatusResult GetStorageChannelStatusResponse; diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp index aba59e184f..f05fc122e1 100644 --- a/ydb/core/keyvalue/keyvalue_state.cpp +++ b/ydb/core/keyvalue/keyvalue_state.cpp @@ -875,6 +875,9 @@ void TKeyValueState::Reply(THolder<TIntermediate> &intermediate, const TActorCon if (intermediate->EvType == TEvKeyValue::TEvRequest::EventType) { THolder<TEvKeyValue::TEvResponse> response(new TEvKeyValue::TEvResponse); response->Record = intermediate->Response; + for (auto& item : intermediate->Payload) { + response->AddPayload(std::move(item)); + } ResourceMetrics->Network.Increment(response->Record.ByteSize()); ctx.Send(intermediate->RespondTo, response.Release()); } @@ -918,16 +921,22 @@ void TKeyValueState::Reply(THolder<TIntermediate> &intermediate, const TActorCon void TKeyValueState::ProcessCmd(TIntermediate::TRead &request, NKikimrClient::TKeyValueResponse::TReadResult *legacyResponse, - NKikimrKeyValue::StorageChannel */*response*/, + NKikimrKeyValue::StorageChannel* /*response*/, ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/, - TIntermediate* /*intermediate*/) + TIntermediate *intermediate) { NKikimrProto::EReplyStatus outStatus = request.CumulativeStatus(); request.Status = outStatus; legacyResponse->SetStatus(outStatus); if (outStatus == NKikimrProto::OK) { - legacyResponse->SetValue(request.Value); - Y_VERIFY(request.Value.size() == request.ValueSize); + TRope value = request.BuildRope(); + if (intermediate->UsePayloadInResponse) { + legacyResponse->SetPayloadId(intermediate->Payload.size()); + intermediate->Payload.push_back(std::move(value)); + } else { + const TContiguousSpan span(value.GetContiguousSpan()); + legacyResponse->SetValue(span.data(), span.size()); + } } else { legacyResponse->SetMessage(request.Message); if (outStatus == NKikimrProto::NODATA) { @@ -960,7 +969,7 @@ void TKeyValueState::ProcessCmd(TIntermediate::TRangeRead &request, NKikimrClient::TKeyValueResponse::TReadRangeResult *legacyResponse, NKikimrKeyValue::StorageChannel */*response*/, ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/, - TIntermediate* /*intermediate*/) + TIntermediate *intermediate) { for (ui64 r = 0; r < request.Reads.size(); ++r) { auto &read = request.Reads[r]; @@ -1003,8 +1012,14 @@ void TKeyValueState::ProcessCmd(TIntermediate::TRangeRead &request, resultKv->SetStatus(outStatus); resultKv->SetKey(read.Key); if (request.IncludeData && (outStatus == NKikimrProto::OK || outStatus == NKikimrProto::OVERRUN)) { - resultKv->SetValue(read.Value); - Y_VERIFY(read.Value.size() == read.ValueSize); + TRope value = read.BuildRope(); + if (intermediate->UsePayloadInResponse) { + resultKv->SetPayloadId(intermediate->Payload.size()); + intermediate->Payload.push_back(std::move(value)); + } else { + const TContiguousSpan span = value.GetContiguousSpan(); + resultKv->SetValue(span.data(), span.size()); + } } resultKv->SetValueSize(read.ValueSize); resultKv->SetCreationUnixTime(read.CreationUnixTime); @@ -1037,7 +1052,7 @@ void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request, record.Chain = {}; ui32 storage_channel = 0; if (request.Status == NKikimrProto::SCHEDULED) { - TRcBuf inlineData = request.Data; + TRope inlineData = request.Data; const size_t size = inlineData.size(); record.Chain.push_back(TIndexRecord::TChainItem(std::move(inlineData), 0)); CountWriteRecord(0, size); @@ -1177,7 +1192,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TConcat &request, for (TIndexRecord::TChainItem& chainItem : input.Chain) { if (chainItem.IsInline()) { - chain.push_back(TIndexRecord::TChainItem(TRcBuf(chainItem.InlineData), offset)); + chain.push_back(TIndexRecord::TChainItem(TRope(chainItem.InlineData), offset)); } else { const TLogoBlobID& id = chainItem.LogoBlobId; chain.push_back(TIndexRecord::TChainItem(id, offset)); @@ -2188,7 +2203,7 @@ void TKeyValueState::SplitIntoBlobs(TIntermediate::TWrite &cmd, bool isInline, u } bool TKeyValueState::PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest, - THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info) { + TEvKeyValue::TEvRequest& ev, THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info) { intermediate->WriteIndices.reserve(kvRequest.CmdWriteSize()); for (ui32 i = 0; i < kvRequest.CmdWriteSize(); ++i) { auto& request = kvRequest.GetCmdWrite(i); @@ -2203,10 +2218,10 @@ bool TKeyValueState::PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TK ReplyError(ctx, str.Str(), NMsgBusProxy::MSTATUS_INTERNALERROR, NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, intermediate); return true; } - if (!request.HasValue()) { + if (request.GetDataCase() == NKikimrClient::TKeyValueRequest::TCmdWrite::DATA_NOT_SET) { TStringStream str; str << "KeyValue# " << TabletId; - str << " Missing Value in CmdWrite(" << i << ") Marker# KV10"; + str << " Missing Value/PayloadId in CmdWrite(" << i << ") Marker# KV10"; ReplyError(ctx, str.Str(), NMsgBusProxy::MSTATUS_INTERNALERROR, NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, intermediate); return true; } @@ -2249,7 +2264,20 @@ bool TKeyValueState::PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TK } interm.Key = request.GetKey(); - interm.Data = TRcBuf(request.GetValue()); + + switch (request.GetDataCase()) { + case NKikimrClient::TKeyValueRequest::TCmdWrite::kValue: + interm.Data = TRope(request.GetValue()); + break; + + case NKikimrClient::TKeyValueRequest::TCmdWrite::kPayloadId: + interm.Data = ev.GetPayload(request.GetPayloadId()); + break; + + case NKikimrClient::TKeyValueRequest::TCmdWrite::DATA_NOT_SET: + Y_UNREACHABLE(); + } + interm.Tactic = TEvBlobStorage::TEvPut::TacticDefault; switch (request.GetTactic()) { case NKikimrClient::TKeyValueRequest::MIN_LATENCY: @@ -2449,7 +2477,7 @@ TPrepareResult TKeyValueState::PrepareOneCmd(const TCommand::Write &request, THo intermediate->Commands.emplace_back(TIntermediate::TWrite()); auto &cmd = std::get<TIntermediate::TWrite>(intermediate->Commands.back()); cmd.Key = request.key(); - cmd.Data = TRcBuf(request.value()); + cmd.Data = TRope(request.value()); switch (request.tactic()) { case TCommand::Write::TACTIC_MIN_LATENCY: cmd.Tactic = TEvBlobStorage::TEvPut::TacticMinLatency; @@ -3126,6 +3154,8 @@ bool TKeyValueState::PrepareIntermediate(TEvKeyValue::TEvRequest::TPtr &ev, THol } intermediate->HasIncrementGeneration = request.HasCmdIncrementGeneration(); + intermediate->UsePayloadInResponse = request.GetUsePayloadInResponse(); + if (CheckDeadline(ctx, request, intermediate)) { return false; } @@ -3152,7 +3182,7 @@ bool TKeyValueState::PrepareIntermediate(TEvKeyValue::TEvRequest::TPtr &ev, THol error = error || PrepareCmdRename(ctx, request, intermediate); error = error || PrepareCmdConcat(ctx, request, intermediate); error = error || PrepareCmdDelete(ctx, request, intermediate); - error = error || PrepareCmdWrite(ctx, request, intermediate, info); + error = error || PrepareCmdWrite(ctx, request, *ev->Get(), intermediate, info); error = error || PrepareCmdGetStatus(ctx, request, intermediate, info); error = error || PrepareCmdTrimLeakedBlobs(ctx, request, intermediate, info); error = error || PrepareCmdSetExecutorFastLogPolicy(ctx, request, intermediate, info); diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h index 901cd5d19e..2f1afd766d 100644 --- a/ydb/core/keyvalue/keyvalue_state.h +++ b/ydb/core/keyvalue/keyvalue_state.h @@ -530,7 +530,7 @@ public: THolder<TIntermediate> &intermediate); bool PrepareCmdDelete(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest, THolder<TIntermediate> &intermediate); - bool PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest, + bool PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest, TEvKeyValue::TEvRequest& ev, THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info); bool PrepareCmdGetStatus(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest, THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info); diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp index 505716e6a2..c73420dd90 100644 --- a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp +++ b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp @@ -3,6 +3,7 @@ #include <ydb/core/util/stlog.h> #include <library/cpp/actors/protos/services_common.pb.h> +#include <util/generic/overloaded.h> namespace NKikimr { @@ -62,16 +63,6 @@ public: return std::holds_alternative<TIntermediate::TRead>(GetCommand()); } - bool IsRangeRead() const { - return std::holds_alternative<TIntermediate::TRangeRead>(GetCommand()); - } - - void AddRead(TIntermediate::TRead &read) { - for (auto &readItem : read.ReadItems) { - ReadItems.push_back({&read, &readItem}); - } - } - NKikimrBlobStorage::EGetHandleClass GetHandleClass() const { auto visitor = [&] (auto &request) { return request.HandleClass; @@ -99,19 +90,22 @@ public: } ui32 readCount = 0; - auto addReadItems = [&](auto &request) { - using Type = std::decay_t<decltype(request)>; - if constexpr (std::is_same_v<Type, TIntermediate::TRead>) { - AddRead(request); - readCount++; - } else { - for (auto &read : request.Reads) { - AddRead(read); - readCount++; - } + auto addRead = [&](TIntermediate::TRead& read) { + for (auto& readItem : read.ReadItems) { + ReadItems.push_back({&read, &readItem}); } + ++readCount; }; - std::visit(addReadItems, GetCommand()); + std::visit(TOverloaded{ + [&](TIntermediate::TRead& read) { + addRead(read); + }, + [&](TIntermediate::TRangeRead& rangeRead) { + for (auto& read : rangeRead.Reads) { + addRead(read); + } + } + }, GetCommand()); if (ReadItems.empty()) { auto getStatus = [&](auto &request) { @@ -284,9 +278,6 @@ public: read.Status = response.Status; if (response.Status == NKikimrProto::OK) { - if (read.Value.size() != read.ValueSize) { - read.Value.resize(read.ValueSize); - } Y_VERIFY_S(response.Buffer.size() == readItem.BlobSize, "response.Buffer.size()# " << response.Buffer.size() << " readItem.BlobSize# " << readItem.BlobSize); @@ -294,11 +285,9 @@ public: "readItem.ValueOffset# " << readItem.ValueOffset << " readItem.BlobSize# " << readItem.BlobSize << " read.ValueSize# " << read.ValueSize); - Y_VERIFY(read.Value.IsDetached()); - response.Buffer.begin().ExtractPlainDataAndAdvance(read.Value.Detach() + readItem.ValueOffset, response.Buffer.size()); IntermediateResult->Stat.GroupReadBytes[std::make_pair(response.Id.Channel(), batch.GroupId)] += response.Buffer.size(); - // FIXME: count distinct blobs?" keyvalue_storage_request.cpp:279 IntermediateResult->Stat.GroupReadIops[std::make_pair(response.Id.Channel(), batch.GroupId)] += 1; + read.Value.Write(readItem.ValueOffset, std::move(response.Buffer)); } else { Y_VERIFY_DEBUG_S(response.Status != NKikimrProto::NODATA, "NODATA received for TEvGet" << " TabletId# " << TabletInfo->TabletID @@ -414,7 +403,10 @@ public: response->Record.set_requested_key(interRead.Key); response->Record.set_requested_offset(interRead.Offset); response->Record.set_requested_size(interRead.RequestedSize); - response->Record.set_value(interRead.Value); + + TRope value = interRead.BuildRope(); + const TContiguousSpan span = value.GetContiguousSpan(); + response->Record.set_value(span.data(), span.size()); if (IntermediateResult->RespondTo.NodeId() != SelfId().NodeId()) { response->Record.set_node_id(SelfId().NodeId()); @@ -455,7 +447,11 @@ public: for (auto &interRead : interRange.Reads) { auto *kvp = readRangeResult.add_pair(); kvp->set_key(interRead.Key); - kvp->set_value(interRead.Value); + + TRope value = interRead.BuildRope(); + const TContiguousSpan span = value.GetContiguousSpan(); + kvp->set_value(span.data(), span.size()); + kvp->set_value_size(interRead.ValueSize); kvp->set_creation_unix_time(interRead.CreationUnixTime); ui32 storageChannel = MainStorageChannelInPublicApi; diff --git a/ydb/core/keyvalue/keyvalue_storage_request.cpp b/ydb/core/keyvalue/keyvalue_storage_request.cpp index 84005ead26..b5e546a1df 100644 --- a/ydb/core/keyvalue/keyvalue_storage_request.cpp +++ b/ydb/core/keyvalue/keyvalue_storage_request.cpp @@ -285,15 +285,11 @@ public: auto& readItem = *it->ReadItem; if (response.Status == NKikimrProto::OK) { - if (read.Value.size() != read.ValueSize) { - read.Value.resize(read.ValueSize); - } Y_VERIFY(response.Buffer.size() == readItem.BlobSize); Y_VERIFY(readItem.ValueOffset + readItem.BlobSize <= read.ValueSize); - Y_VERIFY(read.Value.IsDetached()); - response.Buffer.begin().ExtractPlainDataAndAdvance(read.Value.Detach() + readItem.ValueOffset, response.Buffer.size()); IntermediateResults->Stat.GroupReadBytes[std::make_pair(response.Id.Channel(), groupId)] += response.Buffer.size(); IntermediateResults->Stat.GroupReadIops[std::make_pair(response.Id.Channel(), groupId)] += 1; // FIXME: count distinct blobs? + read.Value.Write(readItem.ValueOffset, std::move(response.Buffer)); } else { TStringStream err; if (read.Message.size()) { @@ -633,15 +629,15 @@ public: if (request.Status != NKikimrProto::SCHEDULED) { Y_VERIFY(request.Status == NKikimrProto::UNKNOWN); - const TRcBuf& data = request.Data; - const TContiguousSpan whole = data.GetContiguousSpan(); + const TRope& data = request.Data; + auto iter = data.begin(); - ui64 offset = 0; for (const TLogoBlobID& logoBlobId : request.LogoBlobIds) { - const TContiguousSpan chunk = whole.SubSpan(offset, logoBlobId.BlobSize()); + const auto begin = iter; + iter += logoBlobId.BlobSize(); THolder<TEvBlobStorage::TEvPut> put( new TEvBlobStorage::TEvPut( - logoBlobId, TRcBuf(TRcBuf::Piece, chunk.data(), chunk.size(), data), + logoBlobId, TRcBuf(TRope(begin, iter)), IntermediateResults->Deadline, request.HandleClass, request.Tactic)); const ui32 groupId = TabletInfo->GroupFor(logoBlobId.Channel(), logoBlobId.Generation()); @@ -654,7 +650,6 @@ public: SendPutToGroup(ctx, groupId, TabletInfo.Get(), std::move(put), i); ++WriteRequestsSent; - offset += logoBlobId.BlobSize(); } } } diff --git a/ydb/core/protos/msgbus_kv.proto b/ydb/core/protos/msgbus_kv.proto index 87c65e5176..c8317d9d4c 100644 --- a/ydb/core/protos/msgbus_kv.proto +++ b/ydb/core/protos/msgbus_kv.proto @@ -52,7 +52,10 @@ message TKeyValueRequest { } message TCmdWrite { optional bytes Key = 1; // mandatory - optional bytes Value = 2; // mandatory + oneof Data { // mandatory + bytes Value = 2; + uint32 PayloadId = 8; + } optional EStorageChannel StorageChannel = 3; // (default = MAIN) optional EPriority Priority = 4; // (default = REALTIME) optional bytes KeyToCache = 5; // used in PQ @@ -86,6 +89,7 @@ message TKeyValueRequest { optional uint64 TabletId = 1; // mandatory optional uint64 Generation = 2; // optional, no generation check is done if missing optional uint64 Cookie = 3; + optional bool UsePayloadInResponse = 16; // use PayloadId instead of Value in TKeyValueResponse repeated TCmdDeleteRange CmdDeleteRange = 4; optional TCmdIncrementGeneration CmdIncrementGeneration = 5; // conflicts with any other Cmd, must be the only one repeated TCmdRead CmdRead = 6; @@ -108,7 +112,10 @@ message TKeyValueResponse { message TKeyValuePair { optional bytes Key = 1; - optional bytes Value = 2; + oneof Data { + bytes Value = 2; + uint32 PayloadId = 7; + } optional uint32 ValueSize = 3; optional uint64 CreationUnixTime = 4; optional TKeyValueRequest.EStorageChannel StorageChannel = 5; // Returns the _actual_ storage channel @@ -124,7 +131,10 @@ message TKeyValueResponse { } message TReadResult { optional uint32 Status = 1; // EReplyStatus from ydb/core/protos/base.proto - optional bytes Value = 2; + oneof Data { + bytes Value = 2; + uint32 PayloadId = 4; + } optional string Message = 3; } message TReadRangeResult { |