aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-07-26 16:29:35 +0300
committeralexvru <alexvru@ydb.tech>2023-07-26 16:29:35 +0300
commit1f6b57071583f89299bb5abd3863d594f23c5be5 (patch)
tree2b29cf80624551b5c89a45fab5cce399ad00beeb
parent66ff7742308d5402fb5e43c059763d67486a97d3 (diff)
downloadydb-1f6b57071583f89299bb5abd3863d594f23c5be5.tar.gz
Zero-copy in KV KIKIMR-18849
-rw-r--r--ydb/core/keyvalue/keyvalue_index_record.cpp51
-rw-r--r--ydb/core/keyvalue/keyvalue_index_record.h4
-rw-r--r--ydb/core/keyvalue/keyvalue_intermediate.cpp10
-rw-r--r--ydb/core/keyvalue/keyvalue_intermediate.h9
-rw-r--r--ydb/core/keyvalue/keyvalue_state.cpp60
-rw-r--r--ydb/core/keyvalue/keyvalue_state.h2
-rw-r--r--ydb/core/keyvalue/keyvalue_storage_read_request.cpp54
-rw-r--r--ydb/core/keyvalue/keyvalue_storage_request.cpp17
-rw-r--r--ydb/core/protos/msgbus_kv.proto16
9 files changed, 133 insertions, 90 deletions
diff --git a/ydb/core/keyvalue/keyvalue_index_record.cpp b/ydb/core/keyvalue/keyvalue_index_record.cpp
index 35b4f8c1e0..120fe31fdf 100644
--- a/ydb/core/keyvalue/keyvalue_index_record.cpp
+++ b/ydb/core/keyvalue/keyvalue_index_record.cpp
@@ -11,7 +11,7 @@ TIndexRecord::TChainItem::TChainItem(const TLogoBlobID &id, ui64 offset)
{
}
-TIndexRecord::TChainItem::TChainItem(TRcBuf&& inlineData, ui64 offset)
+TIndexRecord::TChainItem::TChainItem(TRope&& inlineData, ui64 offset)
: InlineData(std::move(inlineData))
, Offset(offset)
{
@@ -64,16 +64,12 @@ ui32 TIndexRecord::GetReadItems(ui64 offset, ui64 size, TIntermediate::TRead& re
Y_VERIFY(it != Chain.end());
ui32 readSize = Min<ui64>(size, it->GetSize() - offset);
if (it->IsInline()) {
- if (read.Value.size() != read.ValueSize) {
- read.Value.resize(read.ValueSize);
- }
- Y_VERIFY(it->InlineData.size() >= readSize + offset, "size# %" PRIu64 " read# %" PRIu64 " offset# %" PRIu64,
- (ui64)it->InlineData.size(), (ui64)readSize, (ui64)offset);
- Y_VERIFY(read.ValueSize >= readSize + valueOffset);
- memcpy(const_cast<char *>(read.Value.data()) + valueOffset, it->InlineData.data() + offset, readSize);
+ const auto& rope = it->InlineData;
+ const auto begin = rope.Position(offset);
+ const auto end = begin + readSize;
+ read.Value.Write(valueOffset, TRope(begin, end));
} else {
- read.ReadItems.push_back(TIntermediate::TRead::TReadItem(
- it->LogoBlobId, static_cast<ui32>(offset), readSize, valueOffset));
+ read.ReadItems.emplace_back(it->LogoBlobId, static_cast<ui32>(offset), readSize, valueOffset);
}
size -= readSize;
offset = 0;
@@ -105,17 +101,18 @@ TString TIndexRecord::Serialize() const {
data->CreationUnixTime = CreationUnixTime;
ui64 offset = 0;
- for (ui32 i = 0; i < Chain.size(); ++i) {
- if (Chain[i].IsInline()) {
+ for (const auto& item : Chain) {
+ if (item.IsInline()) {
memset(data->Serialized + offset, 0, sizeof(ui64));
offset += sizeof(ui64);
- ui32 size = Chain[i].InlineData.size();
+ ui32 size = item.InlineData.size();
memcpy(data->Serialized + offset, &size, sizeof(ui32));
offset += sizeof(ui32);
- memcpy(data->Serialized + offset, Chain[i].InlineData.data(), Chain[i].InlineData.size());
- offset += Chain[i].InlineData.size();
+ auto& rope = item.InlineData;
+ rope.begin().ExtractPlainDataAndAdvance(data->Serialized + offset, rope.size());
+ offset += rope.size();
} else {
- memcpy(data->Serialized + offset, &Chain[i].LogoBlobId, sizeof(TLogoBlobID));
+ memcpy(data->Serialized + offset, &item.LogoBlobId, sizeof(TLogoBlobID));
offset += sizeof(TLogoBlobID);
}
}
@@ -169,15 +166,17 @@ bool TIndexRecord::Deserialize1(const TString &rawData, TString &outErrorInfo) {
bool TIndexRecord::Deserialize2(const TString &rawData, TString &outErrorInfo) {
Y_VERIFY(rawData.size() >= sizeof(TKeyValueData2));
- const TKeyValueData2 *data = (const TKeyValueData2 *)rawData.data();
- if (!data->CheckChecksum(rawData.size())) {
+ TRcBuf rawDataBuffer(rawData); // encode TString into TRcBuf to slice it further
+ const TContiguousSpan rawDataSpan = rawDataBuffer.GetContiguousSpan();
+ const TKeyValueData2 *data = reinterpret_cast<const TKeyValueData2*>(rawDataSpan.data());
+ if (!data->CheckChecksum(rawDataSpan.size())) {
TStringStream str;
- str << " data->CheckChecksum(rawData.size)# ERROR ";
+ str << " data->CheckChecksum(rawDataSpan.size)# ERROR ";
str << " CreationUnixTime# " << data->CreationUnixTime;
- str << " rawData.size# " << rawData.size();
+ str << " rawDataSpan.size# " << rawDataSpan.size();
str << " data# ";
- for (ui32 i = 0; i < rawData.size(); ++i) {
- ui8 d = ((const ui8*)rawData.data())[i];
+ for (ui32 i = 0; i < rawDataSpan.size(); ++i) {
+ ui8 d = ((const ui8*)rawDataSpan.data())[i];
str << Sprintf("%02x", (ui32)d);
}
outErrorInfo = str.Str();
@@ -186,7 +185,7 @@ bool TIndexRecord::Deserialize2(const TString &rawData, TString &outErrorInfo) {
Y_VERIFY(data->DataHeader.ItemType == EIT_KEYVALUE_2);
CreationUnixTime = data->CreationUnixTime;
ui64 chainOffset = 0;
- ui64 endOffset = rawData.size() - sizeof(TKeyValueData2);
+ ui64 endOffset = rawDataSpan.size() - sizeof(TKeyValueData2);
ui64 offset = 0;
while (offset < endOffset) {
if (endOffset - offset < sizeof(ui64)) {
@@ -218,8 +217,10 @@ bool TIndexRecord::Deserialize2(const TString &rawData, TString &outErrorInfo) {
outErrorInfo = " Deserialization error# DEA4";
return false;
}
- TRcBuf inlineData = TRcBuf::Uninitialized(size);
- memcpy(inlineData.GetDataMut(), data->Serialized + offset, size);
+ TRope inlineData;
+ if (size) {
+ inlineData = TRcBuf(TRcBuf::Piece, data->Serialized + offset, size, rawDataBuffer);
+ }
offset += size;
Chain.push_back(TIndexRecord::TChainItem(std::move(inlineData), chainOffset));
chainOffset += size;
diff --git a/ydb/core/keyvalue/keyvalue_index_record.h b/ydb/core/keyvalue/keyvalue_index_record.h
index f4797861ab..80ca38a42b 100644
--- a/ydb/core/keyvalue/keyvalue_index_record.h
+++ b/ydb/core/keyvalue/keyvalue_index_record.h
@@ -10,11 +10,11 @@ namespace NKeyValue {
struct TIndexRecord {
struct TChainItem {
TLogoBlobID LogoBlobId;
- TRcBuf InlineData;
+ TRope InlineData;
ui64 Offset;
TChainItem(const TLogoBlobID &id, ui64 offset);
- TChainItem(TRcBuf&& inlineData, ui64 offset);
+ TChainItem(TRope&& inlineData, ui64 offset);
bool IsInline() const;
ui64 GetSize() const;
diff --git a/ydb/core/keyvalue/keyvalue_intermediate.cpp b/ydb/core/keyvalue/keyvalue_intermediate.cpp
index 82a78d9fa2..16e6cb06eb 100644
--- a/ydb/core/keyvalue/keyvalue_intermediate.cpp
+++ b/ydb/core/keyvalue/keyvalue_intermediate.cpp
@@ -51,6 +51,12 @@ NKikimrProto::EReplyStatus TIntermediate::TRead::CumulativeStatus() const {
}
}
+TRope TIntermediate::TRead::BuildRope() {
+ TRope rope = Value ? Value.GetMonolith() : TRope();
+ Y_VERIFY(!Value || rope.size() == ValueSize);
+ return rope;
+}
+
TIntermediate::TIntermediate(TActorId respondTo, TActorId keyValueActorId, ui64 channelGeneration, ui64 channelStep,
TRequestType::EType requestType)
: Cookie(0)
@@ -81,7 +87,7 @@ void TIntermediate::UpdateStat() {
Stat.ReadNodata++;
} else if (read.Status == NKikimrProto::OK) {
Stat.Reads++;
- Stat.ReadBytes += read.Value.size();
+ Stat.ReadBytes += read.ValueSize;
}
};
auto checkRangeRead = [&] (const auto &range) {
@@ -91,7 +97,7 @@ void TIntermediate::UpdateStat() {
Stat.RangeReadItemsNodata++;
} else if (read.Status == NKikimrProto::OK) {
Stat.RangeReadItems++;
- Stat.RangeReadBytes += read.Value.size();
+ Stat.RangeReadBytes += read.ValueSize;
}
}
} else {
diff --git a/ydb/core/keyvalue/keyvalue_intermediate.h b/ydb/core/keyvalue/keyvalue_intermediate.h
index 8df55fcaed..3812a8e7fa 100644
--- a/ydb/core/keyvalue/keyvalue_intermediate.h
+++ b/ydb/core/keyvalue/keyvalue_intermediate.h
@@ -7,6 +7,7 @@
#include <ydb/core/protos/base.pb.h>
#include <ydb/core/protos/msgbus_kv.pb.h>
#include <ydb/core/protos/msgbus.pb.h>
+#include <ydb/core/util/fragmented_buffer.h>
#include <ydb/core/keyvalue/protos/events.pb.h>
namespace NKikimr {
@@ -34,7 +35,7 @@ struct TIntermediate {
TVector<TReadItem> ReadItems;
TString Key;
- TString Value;
+ TFragmentedBuffer Value;
ui32 Offset;
ui32 Size;
ui32 ValueSize;
@@ -50,6 +51,7 @@ struct TIntermediate {
NKikimrClient::TKeyValueRequest::EStorageChannel storageChannel);
NKikimrProto::EReplyStatus ItemsStatus() const;
NKikimrProto::EReplyStatus CumulativeStatus() const;
+ TRope BuildRope();
};
struct TRangeRead {
TDeque<TRead> Reads;
@@ -61,7 +63,7 @@ struct TIntermediate {
struct TWrite {
TVector<TLogoBlobID> LogoBlobIds;
TString Key;
- TRcBuf Data;
+ TRope Data;
TEvBlobStorage::TEvPut::ETactic Tactic;
NKikimrBlobStorage::EPutHandleClass HandleClass;
NKikimrProto::EReplyStatus Status;
@@ -148,9 +150,12 @@ struct TIntermediate {
bool IsReplied;
+ bool UsePayloadInResponse = false;
+
TRequestStat Stat;
NKikimrClient::TResponse Response;
+ std::vector<TRope> Payload;
NKikimrKeyValue::ExecuteTransactionResult ExecuteTransactionResponse;
NKikimrKeyValue::GetStorageChannelStatusResult GetStorageChannelStatusResponse;
diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp
index aba59e184f..f05fc122e1 100644
--- a/ydb/core/keyvalue/keyvalue_state.cpp
+++ b/ydb/core/keyvalue/keyvalue_state.cpp
@@ -875,6 +875,9 @@ void TKeyValueState::Reply(THolder<TIntermediate> &intermediate, const TActorCon
if (intermediate->EvType == TEvKeyValue::TEvRequest::EventType) {
THolder<TEvKeyValue::TEvResponse> response(new TEvKeyValue::TEvResponse);
response->Record = intermediate->Response;
+ for (auto& item : intermediate->Payload) {
+ response->AddPayload(std::move(item));
+ }
ResourceMetrics->Network.Increment(response->Record.ByteSize());
ctx.Send(intermediate->RespondTo, response.Release());
}
@@ -918,16 +921,22 @@ void TKeyValueState::Reply(THolder<TIntermediate> &intermediate, const TActorCon
void TKeyValueState::ProcessCmd(TIntermediate::TRead &request,
NKikimrClient::TKeyValueResponse::TReadResult *legacyResponse,
- NKikimrKeyValue::StorageChannel */*response*/,
+ NKikimrKeyValue::StorageChannel* /*response*/,
ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/,
- TIntermediate* /*intermediate*/)
+ TIntermediate *intermediate)
{
NKikimrProto::EReplyStatus outStatus = request.CumulativeStatus();
request.Status = outStatus;
legacyResponse->SetStatus(outStatus);
if (outStatus == NKikimrProto::OK) {
- legacyResponse->SetValue(request.Value);
- Y_VERIFY(request.Value.size() == request.ValueSize);
+ TRope value = request.BuildRope();
+ if (intermediate->UsePayloadInResponse) {
+ legacyResponse->SetPayloadId(intermediate->Payload.size());
+ intermediate->Payload.push_back(std::move(value));
+ } else {
+ const TContiguousSpan span(value.GetContiguousSpan());
+ legacyResponse->SetValue(span.data(), span.size());
+ }
} else {
legacyResponse->SetMessage(request.Message);
if (outStatus == NKikimrProto::NODATA) {
@@ -960,7 +969,7 @@ void TKeyValueState::ProcessCmd(TIntermediate::TRangeRead &request,
NKikimrClient::TKeyValueResponse::TReadRangeResult *legacyResponse,
NKikimrKeyValue::StorageChannel */*response*/,
ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/,
- TIntermediate* /*intermediate*/)
+ TIntermediate *intermediate)
{
for (ui64 r = 0; r < request.Reads.size(); ++r) {
auto &read = request.Reads[r];
@@ -1003,8 +1012,14 @@ void TKeyValueState::ProcessCmd(TIntermediate::TRangeRead &request,
resultKv->SetStatus(outStatus);
resultKv->SetKey(read.Key);
if (request.IncludeData && (outStatus == NKikimrProto::OK || outStatus == NKikimrProto::OVERRUN)) {
- resultKv->SetValue(read.Value);
- Y_VERIFY(read.Value.size() == read.ValueSize);
+ TRope value = read.BuildRope();
+ if (intermediate->UsePayloadInResponse) {
+ resultKv->SetPayloadId(intermediate->Payload.size());
+ intermediate->Payload.push_back(std::move(value));
+ } else {
+ const TContiguousSpan span = value.GetContiguousSpan();
+ resultKv->SetValue(span.data(), span.size());
+ }
}
resultKv->SetValueSize(read.ValueSize);
resultKv->SetCreationUnixTime(read.CreationUnixTime);
@@ -1037,7 +1052,7 @@ void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request,
record.Chain = {};
ui32 storage_channel = 0;
if (request.Status == NKikimrProto::SCHEDULED) {
- TRcBuf inlineData = request.Data;
+ TRope inlineData = request.Data;
const size_t size = inlineData.size();
record.Chain.push_back(TIndexRecord::TChainItem(std::move(inlineData), 0));
CountWriteRecord(0, size);
@@ -1177,7 +1192,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TConcat &request,
for (TIndexRecord::TChainItem& chainItem : input.Chain) {
if (chainItem.IsInline()) {
- chain.push_back(TIndexRecord::TChainItem(TRcBuf(chainItem.InlineData), offset));
+ chain.push_back(TIndexRecord::TChainItem(TRope(chainItem.InlineData), offset));
} else {
const TLogoBlobID& id = chainItem.LogoBlobId;
chain.push_back(TIndexRecord::TChainItem(id, offset));
@@ -2188,7 +2203,7 @@ void TKeyValueState::SplitIntoBlobs(TIntermediate::TWrite &cmd, bool isInline, u
}
bool TKeyValueState::PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest,
- THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info) {
+ TEvKeyValue::TEvRequest& ev, THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info) {
intermediate->WriteIndices.reserve(kvRequest.CmdWriteSize());
for (ui32 i = 0; i < kvRequest.CmdWriteSize(); ++i) {
auto& request = kvRequest.GetCmdWrite(i);
@@ -2203,10 +2218,10 @@ bool TKeyValueState::PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TK
ReplyError(ctx, str.Str(), NMsgBusProxy::MSTATUS_INTERNALERROR, NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, intermediate);
return true;
}
- if (!request.HasValue()) {
+ if (request.GetDataCase() == NKikimrClient::TKeyValueRequest::TCmdWrite::DATA_NOT_SET) {
TStringStream str;
str << "KeyValue# " << TabletId;
- str << " Missing Value in CmdWrite(" << i << ") Marker# KV10";
+ str << " Missing Value/PayloadId in CmdWrite(" << i << ") Marker# KV10";
ReplyError(ctx, str.Str(), NMsgBusProxy::MSTATUS_INTERNALERROR, NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, intermediate);
return true;
}
@@ -2249,7 +2264,20 @@ bool TKeyValueState::PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TK
}
interm.Key = request.GetKey();
- interm.Data = TRcBuf(request.GetValue());
+
+ switch (request.GetDataCase()) {
+ case NKikimrClient::TKeyValueRequest::TCmdWrite::kValue:
+ interm.Data = TRope(request.GetValue());
+ break;
+
+ case NKikimrClient::TKeyValueRequest::TCmdWrite::kPayloadId:
+ interm.Data = ev.GetPayload(request.GetPayloadId());
+ break;
+
+ case NKikimrClient::TKeyValueRequest::TCmdWrite::DATA_NOT_SET:
+ Y_UNREACHABLE();
+ }
+
interm.Tactic = TEvBlobStorage::TEvPut::TacticDefault;
switch (request.GetTactic()) {
case NKikimrClient::TKeyValueRequest::MIN_LATENCY:
@@ -2449,7 +2477,7 @@ TPrepareResult TKeyValueState::PrepareOneCmd(const TCommand::Write &request, THo
intermediate->Commands.emplace_back(TIntermediate::TWrite());
auto &cmd = std::get<TIntermediate::TWrite>(intermediate->Commands.back());
cmd.Key = request.key();
- cmd.Data = TRcBuf(request.value());
+ cmd.Data = TRope(request.value());
switch (request.tactic()) {
case TCommand::Write::TACTIC_MIN_LATENCY:
cmd.Tactic = TEvBlobStorage::TEvPut::TacticMinLatency;
@@ -3126,6 +3154,8 @@ bool TKeyValueState::PrepareIntermediate(TEvKeyValue::TEvRequest::TPtr &ev, THol
}
intermediate->HasIncrementGeneration = request.HasCmdIncrementGeneration();
+ intermediate->UsePayloadInResponse = request.GetUsePayloadInResponse();
+
if (CheckDeadline(ctx, request, intermediate)) {
return false;
}
@@ -3152,7 +3182,7 @@ bool TKeyValueState::PrepareIntermediate(TEvKeyValue::TEvRequest::TPtr &ev, THol
error = error || PrepareCmdRename(ctx, request, intermediate);
error = error || PrepareCmdConcat(ctx, request, intermediate);
error = error || PrepareCmdDelete(ctx, request, intermediate);
- error = error || PrepareCmdWrite(ctx, request, intermediate, info);
+ error = error || PrepareCmdWrite(ctx, request, *ev->Get(), intermediate, info);
error = error || PrepareCmdGetStatus(ctx, request, intermediate, info);
error = error || PrepareCmdTrimLeakedBlobs(ctx, request, intermediate, info);
error = error || PrepareCmdSetExecutorFastLogPolicy(ctx, request, intermediate, info);
diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h
index 901cd5d19e..2f1afd766d 100644
--- a/ydb/core/keyvalue/keyvalue_state.h
+++ b/ydb/core/keyvalue/keyvalue_state.h
@@ -530,7 +530,7 @@ public:
THolder<TIntermediate> &intermediate);
bool PrepareCmdDelete(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest,
THolder<TIntermediate> &intermediate);
- bool PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest,
+ bool PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest, TEvKeyValue::TEvRequest& ev,
THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info);
bool PrepareCmdGetStatus(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest,
THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info);
diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp
index 505716e6a2..c73420dd90 100644
--- a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp
+++ b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp
@@ -3,6 +3,7 @@
#include <ydb/core/util/stlog.h>
#include <library/cpp/actors/protos/services_common.pb.h>
+#include <util/generic/overloaded.h>
namespace NKikimr {
@@ -62,16 +63,6 @@ public:
return std::holds_alternative<TIntermediate::TRead>(GetCommand());
}
- bool IsRangeRead() const {
- return std::holds_alternative<TIntermediate::TRangeRead>(GetCommand());
- }
-
- void AddRead(TIntermediate::TRead &read) {
- for (auto &readItem : read.ReadItems) {
- ReadItems.push_back({&read, &readItem});
- }
- }
-
NKikimrBlobStorage::EGetHandleClass GetHandleClass() const {
auto visitor = [&] (auto &request) {
return request.HandleClass;
@@ -99,19 +90,22 @@ public:
}
ui32 readCount = 0;
- auto addReadItems = [&](auto &request) {
- using Type = std::decay_t<decltype(request)>;
- if constexpr (std::is_same_v<Type, TIntermediate::TRead>) {
- AddRead(request);
- readCount++;
- } else {
- for (auto &read : request.Reads) {
- AddRead(read);
- readCount++;
- }
+ auto addRead = [&](TIntermediate::TRead& read) {
+ for (auto& readItem : read.ReadItems) {
+ ReadItems.push_back({&read, &readItem});
}
+ ++readCount;
};
- std::visit(addReadItems, GetCommand());
+ std::visit(TOverloaded{
+ [&](TIntermediate::TRead& read) {
+ addRead(read);
+ },
+ [&](TIntermediate::TRangeRead& rangeRead) {
+ for (auto& read : rangeRead.Reads) {
+ addRead(read);
+ }
+ }
+ }, GetCommand());
if (ReadItems.empty()) {
auto getStatus = [&](auto &request) {
@@ -284,9 +278,6 @@ public:
read.Status = response.Status;
if (response.Status == NKikimrProto::OK) {
- if (read.Value.size() != read.ValueSize) {
- read.Value.resize(read.ValueSize);
- }
Y_VERIFY_S(response.Buffer.size() == readItem.BlobSize,
"response.Buffer.size()# " << response.Buffer.size()
<< " readItem.BlobSize# " << readItem.BlobSize);
@@ -294,11 +285,9 @@ public:
"readItem.ValueOffset# " << readItem.ValueOffset
<< " readItem.BlobSize# " << readItem.BlobSize
<< " read.ValueSize# " << read.ValueSize);
- Y_VERIFY(read.Value.IsDetached());
- response.Buffer.begin().ExtractPlainDataAndAdvance(read.Value.Detach() + readItem.ValueOffset, response.Buffer.size());
IntermediateResult->Stat.GroupReadBytes[std::make_pair(response.Id.Channel(), batch.GroupId)] += response.Buffer.size();
- // FIXME: count distinct blobs?" keyvalue_storage_request.cpp:279
IntermediateResult->Stat.GroupReadIops[std::make_pair(response.Id.Channel(), batch.GroupId)] += 1;
+ read.Value.Write(readItem.ValueOffset, std::move(response.Buffer));
} else {
Y_VERIFY_DEBUG_S(response.Status != NKikimrProto::NODATA, "NODATA received for TEvGet"
<< " TabletId# " << TabletInfo->TabletID
@@ -414,7 +403,10 @@ public:
response->Record.set_requested_key(interRead.Key);
response->Record.set_requested_offset(interRead.Offset);
response->Record.set_requested_size(interRead.RequestedSize);
- response->Record.set_value(interRead.Value);
+
+ TRope value = interRead.BuildRope();
+ const TContiguousSpan span = value.GetContiguousSpan();
+ response->Record.set_value(span.data(), span.size());
if (IntermediateResult->RespondTo.NodeId() != SelfId().NodeId()) {
response->Record.set_node_id(SelfId().NodeId());
@@ -455,7 +447,11 @@ public:
for (auto &interRead : interRange.Reads) {
auto *kvp = readRangeResult.add_pair();
kvp->set_key(interRead.Key);
- kvp->set_value(interRead.Value);
+
+ TRope value = interRead.BuildRope();
+ const TContiguousSpan span = value.GetContiguousSpan();
+ kvp->set_value(span.data(), span.size());
+
kvp->set_value_size(interRead.ValueSize);
kvp->set_creation_unix_time(interRead.CreationUnixTime);
ui32 storageChannel = MainStorageChannelInPublicApi;
diff --git a/ydb/core/keyvalue/keyvalue_storage_request.cpp b/ydb/core/keyvalue/keyvalue_storage_request.cpp
index 84005ead26..b5e546a1df 100644
--- a/ydb/core/keyvalue/keyvalue_storage_request.cpp
+++ b/ydb/core/keyvalue/keyvalue_storage_request.cpp
@@ -285,15 +285,11 @@ public:
auto& readItem = *it->ReadItem;
if (response.Status == NKikimrProto::OK) {
- if (read.Value.size() != read.ValueSize) {
- read.Value.resize(read.ValueSize);
- }
Y_VERIFY(response.Buffer.size() == readItem.BlobSize);
Y_VERIFY(readItem.ValueOffset + readItem.BlobSize <= read.ValueSize);
- Y_VERIFY(read.Value.IsDetached());
- response.Buffer.begin().ExtractPlainDataAndAdvance(read.Value.Detach() + readItem.ValueOffset, response.Buffer.size());
IntermediateResults->Stat.GroupReadBytes[std::make_pair(response.Id.Channel(), groupId)] += response.Buffer.size();
IntermediateResults->Stat.GroupReadIops[std::make_pair(response.Id.Channel(), groupId)] += 1; // FIXME: count distinct blobs?
+ read.Value.Write(readItem.ValueOffset, std::move(response.Buffer));
} else {
TStringStream err;
if (read.Message.size()) {
@@ -633,15 +629,15 @@ public:
if (request.Status != NKikimrProto::SCHEDULED) {
Y_VERIFY(request.Status == NKikimrProto::UNKNOWN);
- const TRcBuf& data = request.Data;
- const TContiguousSpan whole = data.GetContiguousSpan();
+ const TRope& data = request.Data;
+ auto iter = data.begin();
- ui64 offset = 0;
for (const TLogoBlobID& logoBlobId : request.LogoBlobIds) {
- const TContiguousSpan chunk = whole.SubSpan(offset, logoBlobId.BlobSize());
+ const auto begin = iter;
+ iter += logoBlobId.BlobSize();
THolder<TEvBlobStorage::TEvPut> put(
new TEvBlobStorage::TEvPut(
- logoBlobId, TRcBuf(TRcBuf::Piece, chunk.data(), chunk.size(), data),
+ logoBlobId, TRcBuf(TRope(begin, iter)),
IntermediateResults->Deadline, request.HandleClass,
request.Tactic));
const ui32 groupId = TabletInfo->GroupFor(logoBlobId.Channel(), logoBlobId.Generation());
@@ -654,7 +650,6 @@ public:
SendPutToGroup(ctx, groupId, TabletInfo.Get(), std::move(put), i);
++WriteRequestsSent;
- offset += logoBlobId.BlobSize();
}
}
}
diff --git a/ydb/core/protos/msgbus_kv.proto b/ydb/core/protos/msgbus_kv.proto
index 87c65e5176..c8317d9d4c 100644
--- a/ydb/core/protos/msgbus_kv.proto
+++ b/ydb/core/protos/msgbus_kv.proto
@@ -52,7 +52,10 @@ message TKeyValueRequest {
}
message TCmdWrite {
optional bytes Key = 1; // mandatory
- optional bytes Value = 2; // mandatory
+ oneof Data { // mandatory
+ bytes Value = 2;
+ uint32 PayloadId = 8;
+ }
optional EStorageChannel StorageChannel = 3; // (default = MAIN)
optional EPriority Priority = 4; // (default = REALTIME)
optional bytes KeyToCache = 5; // used in PQ
@@ -86,6 +89,7 @@ message TKeyValueRequest {
optional uint64 TabletId = 1; // mandatory
optional uint64 Generation = 2; // optional, no generation check is done if missing
optional uint64 Cookie = 3;
+ optional bool UsePayloadInResponse = 16; // use PayloadId instead of Value in TKeyValueResponse
repeated TCmdDeleteRange CmdDeleteRange = 4;
optional TCmdIncrementGeneration CmdIncrementGeneration = 5; // conflicts with any other Cmd, must be the only one
repeated TCmdRead CmdRead = 6;
@@ -108,7 +112,10 @@ message TKeyValueResponse {
message TKeyValuePair {
optional bytes Key = 1;
- optional bytes Value = 2;
+ oneof Data {
+ bytes Value = 2;
+ uint32 PayloadId = 7;
+ }
optional uint32 ValueSize = 3;
optional uint64 CreationUnixTime = 4;
optional TKeyValueRequest.EStorageChannel StorageChannel = 5; // Returns the _actual_ storage channel
@@ -124,7 +131,10 @@ message TKeyValueResponse {
}
message TReadResult {
optional uint32 Status = 1; // EReplyStatus from ydb/core/protos/base.proto
- optional bytes Value = 2;
+ oneof Data {
+ bytes Value = 2;
+ uint32 PayloadId = 4;
+ }
optional string Message = 3;
}
message TReadRangeResult {