diff options
| author | alexvru <[email protected]> | 2023-07-26 16:29:35 +0300 | 
|---|---|---|
| committer | alexvru <[email protected]> | 2023-07-26 16:29:35 +0300 | 
| commit | 1f6b57071583f89299bb5abd3863d594f23c5be5 (patch) | |
| tree | 2b29cf80624551b5c89a45fab5cce399ad00beeb | |
| parent | 66ff7742308d5402fb5e43c059763d67486a97d3 (diff) | |
Zero-copy in KV KIKIMR-18849
| -rw-r--r-- | ydb/core/keyvalue/keyvalue_index_record.cpp | 51 | ||||
| -rw-r--r-- | ydb/core/keyvalue/keyvalue_index_record.h | 4 | ||||
| -rw-r--r-- | ydb/core/keyvalue/keyvalue_intermediate.cpp | 10 | ||||
| -rw-r--r-- | ydb/core/keyvalue/keyvalue_intermediate.h | 9 | ||||
| -rw-r--r-- | ydb/core/keyvalue/keyvalue_state.cpp | 60 | ||||
| -rw-r--r-- | ydb/core/keyvalue/keyvalue_state.h | 2 | ||||
| -rw-r--r-- | ydb/core/keyvalue/keyvalue_storage_read_request.cpp | 54 | ||||
| -rw-r--r-- | ydb/core/keyvalue/keyvalue_storage_request.cpp | 17 | ||||
| -rw-r--r-- | ydb/core/protos/msgbus_kv.proto | 16 | 
9 files changed, 133 insertions, 90 deletions
| diff --git a/ydb/core/keyvalue/keyvalue_index_record.cpp b/ydb/core/keyvalue/keyvalue_index_record.cpp index 35b4f8c1e0a..120fe31fdf1 100644 --- a/ydb/core/keyvalue/keyvalue_index_record.cpp +++ b/ydb/core/keyvalue/keyvalue_index_record.cpp @@ -11,7 +11,7 @@ TIndexRecord::TChainItem::TChainItem(const TLogoBlobID &id, ui64 offset)  {  } -TIndexRecord::TChainItem::TChainItem(TRcBuf&& inlineData, ui64 offset) +TIndexRecord::TChainItem::TChainItem(TRope&& inlineData, ui64 offset)      : InlineData(std::move(inlineData))      , Offset(offset)  { @@ -64,16 +64,12 @@ ui32 TIndexRecord::GetReadItems(ui64 offset, ui64 size, TIntermediate::TRead& re          Y_VERIFY(it != Chain.end());          ui32 readSize = Min<ui64>(size, it->GetSize() - offset);          if (it->IsInline()) { -            if (read.Value.size() != read.ValueSize) { -                read.Value.resize(read.ValueSize); -            } -            Y_VERIFY(it->InlineData.size() >= readSize + offset, "size# %" PRIu64 " read# %" PRIu64 " offset# %" PRIu64, -                    (ui64)it->InlineData.size(), (ui64)readSize, (ui64)offset); -            Y_VERIFY(read.ValueSize >= readSize + valueOffset); -            memcpy(const_cast<char *>(read.Value.data()) + valueOffset, it->InlineData.data() + offset, readSize); +            const auto& rope = it->InlineData; +            const auto begin = rope.Position(offset); +            const auto end = begin + readSize; +            read.Value.Write(valueOffset, TRope(begin, end));          } else { -            read.ReadItems.push_back(TIntermediate::TRead::TReadItem( -                    it->LogoBlobId, static_cast<ui32>(offset), readSize, valueOffset)); +            read.ReadItems.emplace_back(it->LogoBlobId, static_cast<ui32>(offset), readSize, valueOffset);          }          size -= readSize;          offset = 0; @@ -105,17 +101,18 @@ TString TIndexRecord::Serialize() const {      data->CreationUnixTime = CreationUnixTime;      ui64 offset = 0; -    for (ui32 i = 0; i < Chain.size(); ++i) { -        if (Chain[i].IsInline()) { +    for (const auto& item : Chain) { +        if (item.IsInline()) {              memset(data->Serialized + offset, 0, sizeof(ui64));              offset += sizeof(ui64); -            ui32 size = Chain[i].InlineData.size(); +            ui32 size = item.InlineData.size();              memcpy(data->Serialized + offset, &size, sizeof(ui32));              offset += sizeof(ui32); -            memcpy(data->Serialized + offset, Chain[i].InlineData.data(), Chain[i].InlineData.size()); -            offset += Chain[i].InlineData.size(); +            auto& rope = item.InlineData; +            rope.begin().ExtractPlainDataAndAdvance(data->Serialized + offset, rope.size()); +            offset += rope.size();          } else { -            memcpy(data->Serialized + offset, &Chain[i].LogoBlobId, sizeof(TLogoBlobID)); +            memcpy(data->Serialized + offset, &item.LogoBlobId, sizeof(TLogoBlobID));              offset += sizeof(TLogoBlobID);          }      } @@ -169,15 +166,17 @@ bool TIndexRecord::Deserialize1(const TString &rawData, TString &outErrorInfo) {  bool TIndexRecord::Deserialize2(const TString &rawData, TString &outErrorInfo) {      Y_VERIFY(rawData.size() >= sizeof(TKeyValueData2)); -    const TKeyValueData2 *data = (const TKeyValueData2 *)rawData.data(); -    if (!data->CheckChecksum(rawData.size())) { +    TRcBuf rawDataBuffer(rawData); // encode TString into TRcBuf to slice it further +    const TContiguousSpan rawDataSpan = rawDataBuffer.GetContiguousSpan(); +    const TKeyValueData2 *data = reinterpret_cast<const TKeyValueData2*>(rawDataSpan.data()); +    if (!data->CheckChecksum(rawDataSpan.size())) {          TStringStream str; -        str << " data->CheckChecksum(rawData.size)# ERROR "; +        str << " data->CheckChecksum(rawDataSpan.size)# ERROR ";          str << " CreationUnixTime# " << data->CreationUnixTime; -        str << " rawData.size# " << rawData.size(); +        str << " rawDataSpan.size# " << rawDataSpan.size();          str << " data# "; -        for (ui32 i = 0; i < rawData.size(); ++i) { -            ui8 d = ((const ui8*)rawData.data())[i]; +        for (ui32 i = 0; i < rawDataSpan.size(); ++i) { +            ui8 d = ((const ui8*)rawDataSpan.data())[i];              str << Sprintf("%02x", (ui32)d);          }          outErrorInfo = str.Str(); @@ -186,7 +185,7 @@ bool TIndexRecord::Deserialize2(const TString &rawData, TString &outErrorInfo) {      Y_VERIFY(data->DataHeader.ItemType == EIT_KEYVALUE_2);      CreationUnixTime = data->CreationUnixTime;      ui64 chainOffset = 0; -    ui64 endOffset = rawData.size() - sizeof(TKeyValueData2); +    ui64 endOffset = rawDataSpan.size() - sizeof(TKeyValueData2);      ui64 offset = 0;      while (offset < endOffset) {          if (endOffset - offset < sizeof(ui64)) { @@ -218,8 +217,10 @@ bool TIndexRecord::Deserialize2(const TString &rawData, TString &outErrorInfo) {                  outErrorInfo = " Deserialization error# DEA4";                  return false;              } -            TRcBuf inlineData = TRcBuf::Uninitialized(size); -            memcpy(inlineData.GetDataMut(), data->Serialized + offset, size); +            TRope inlineData; +            if (size) { +                inlineData = TRcBuf(TRcBuf::Piece, data->Serialized + offset, size, rawDataBuffer); +            }              offset += size;              Chain.push_back(TIndexRecord::TChainItem(std::move(inlineData), chainOffset));              chainOffset += size; diff --git a/ydb/core/keyvalue/keyvalue_index_record.h b/ydb/core/keyvalue/keyvalue_index_record.h index f4797861abd..80ca38a42bb 100644 --- a/ydb/core/keyvalue/keyvalue_index_record.h +++ b/ydb/core/keyvalue/keyvalue_index_record.h @@ -10,11 +10,11 @@ namespace NKeyValue {  struct TIndexRecord {      struct TChainItem {          TLogoBlobID LogoBlobId; -        TRcBuf InlineData; +        TRope InlineData;          ui64 Offset;          TChainItem(const TLogoBlobID &id, ui64 offset); -        TChainItem(TRcBuf&& inlineData, ui64 offset); +        TChainItem(TRope&& inlineData, ui64 offset);          bool IsInline() const;          ui64 GetSize() const; diff --git a/ydb/core/keyvalue/keyvalue_intermediate.cpp b/ydb/core/keyvalue/keyvalue_intermediate.cpp index 82a78d9fa29..16e6cb06eb8 100644 --- a/ydb/core/keyvalue/keyvalue_intermediate.cpp +++ b/ydb/core/keyvalue/keyvalue_intermediate.cpp @@ -51,6 +51,12 @@ NKikimrProto::EReplyStatus TIntermediate::TRead::CumulativeStatus() const {      }  } +TRope TIntermediate::TRead::BuildRope() { +    TRope rope = Value ? Value.GetMonolith() : TRope(); +    Y_VERIFY(!Value || rope.size() == ValueSize); +    return rope; +} +  TIntermediate::TIntermediate(TActorId respondTo, TActorId keyValueActorId, ui64 channelGeneration, ui64 channelStep,          TRequestType::EType requestType)      : Cookie(0) @@ -81,7 +87,7 @@ void TIntermediate::UpdateStat() {              Stat.ReadNodata++;          } else if (read.Status == NKikimrProto::OK) {              Stat.Reads++; -            Stat.ReadBytes += read.Value.size(); +            Stat.ReadBytes += read.ValueSize;          }      };      auto checkRangeRead = [&] (const auto &range) { @@ -91,7 +97,7 @@ void TIntermediate::UpdateStat() {                      Stat.RangeReadItemsNodata++;                  } else if (read.Status == NKikimrProto::OK) {                      Stat.RangeReadItems++; -                    Stat.RangeReadBytes += read.Value.size(); +                    Stat.RangeReadBytes += read.ValueSize;                  }              }          } else { diff --git a/ydb/core/keyvalue/keyvalue_intermediate.h b/ydb/core/keyvalue/keyvalue_intermediate.h index 8df55fcaed7..3812a8e7fa5 100644 --- a/ydb/core/keyvalue/keyvalue_intermediate.h +++ b/ydb/core/keyvalue/keyvalue_intermediate.h @@ -7,6 +7,7 @@  #include <ydb/core/protos/base.pb.h>  #include <ydb/core/protos/msgbus_kv.pb.h>  #include <ydb/core/protos/msgbus.pb.h> +#include <ydb/core/util/fragmented_buffer.h>  #include <ydb/core/keyvalue/protos/events.pb.h>  namespace NKikimr { @@ -34,7 +35,7 @@ struct TIntermediate {          TVector<TReadItem> ReadItems;          TString Key; -        TString Value; +        TFragmentedBuffer Value;          ui32 Offset;          ui32 Size;          ui32 ValueSize; @@ -50,6 +51,7 @@ struct TIntermediate {                  NKikimrClient::TKeyValueRequest::EStorageChannel storageChannel);          NKikimrProto::EReplyStatus ItemsStatus() const;          NKikimrProto::EReplyStatus CumulativeStatus() const; +        TRope BuildRope();      };      struct TRangeRead {          TDeque<TRead> Reads; @@ -61,7 +63,7 @@ struct TIntermediate {      struct TWrite {          TVector<TLogoBlobID> LogoBlobIds;          TString Key; -        TRcBuf Data; +        TRope Data;          TEvBlobStorage::TEvPut::ETactic Tactic;          NKikimrBlobStorage::EPutHandleClass HandleClass;          NKikimrProto::EReplyStatus Status; @@ -148,9 +150,12 @@ struct TIntermediate {      bool IsReplied; +    bool UsePayloadInResponse = false; +      TRequestStat Stat;      NKikimrClient::TResponse Response; +    std::vector<TRope> Payload;      NKikimrKeyValue::ExecuteTransactionResult ExecuteTransactionResponse;      NKikimrKeyValue::GetStorageChannelStatusResult GetStorageChannelStatusResponse; diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp index aba59e184fb..f05fc122e16 100644 --- a/ydb/core/keyvalue/keyvalue_state.cpp +++ b/ydb/core/keyvalue/keyvalue_state.cpp @@ -875,6 +875,9 @@ void TKeyValueState::Reply(THolder<TIntermediate> &intermediate, const TActorCon          if (intermediate->EvType == TEvKeyValue::TEvRequest::EventType) {              THolder<TEvKeyValue::TEvResponse> response(new TEvKeyValue::TEvResponse);              response->Record = intermediate->Response; +            for (auto& item : intermediate->Payload) { +                response->AddPayload(std::move(item)); +            }              ResourceMetrics->Network.Increment(response->Record.ByteSize());              ctx.Send(intermediate->RespondTo, response.Release());          } @@ -918,16 +921,22 @@ void TKeyValueState::Reply(THolder<TIntermediate> &intermediate, const TActorCon  void TKeyValueState::ProcessCmd(TIntermediate::TRead &request,          NKikimrClient::TKeyValueResponse::TReadResult *legacyResponse, -        NKikimrKeyValue::StorageChannel */*response*/, +        NKikimrKeyValue::StorageChannel* /*response*/,          ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/, -        TIntermediate* /*intermediate*/) +        TIntermediate *intermediate)  {      NKikimrProto::EReplyStatus outStatus = request.CumulativeStatus();      request.Status = outStatus;      legacyResponse->SetStatus(outStatus);      if (outStatus == NKikimrProto::OK) { -        legacyResponse->SetValue(request.Value); -        Y_VERIFY(request.Value.size() == request.ValueSize); +        TRope value = request.BuildRope(); +        if (intermediate->UsePayloadInResponse) { +            legacyResponse->SetPayloadId(intermediate->Payload.size()); +            intermediate->Payload.push_back(std::move(value)); +        } else { +            const TContiguousSpan span(value.GetContiguousSpan()); +            legacyResponse->SetValue(span.data(), span.size()); +        }      } else {          legacyResponse->SetMessage(request.Message);          if (outStatus == NKikimrProto::NODATA) { @@ -960,7 +969,7 @@ void TKeyValueState::ProcessCmd(TIntermediate::TRangeRead &request,          NKikimrClient::TKeyValueResponse::TReadRangeResult *legacyResponse,          NKikimrKeyValue::StorageChannel */*response*/,          ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/, -        TIntermediate* /*intermediate*/) +        TIntermediate *intermediate)  {      for (ui64 r = 0; r < request.Reads.size(); ++r) {          auto &read = request.Reads[r]; @@ -1003,8 +1012,14 @@ void TKeyValueState::ProcessCmd(TIntermediate::TRangeRead &request,          resultKv->SetStatus(outStatus);          resultKv->SetKey(read.Key);          if (request.IncludeData && (outStatus == NKikimrProto::OK || outStatus == NKikimrProto::OVERRUN)) { -            resultKv->SetValue(read.Value); -            Y_VERIFY(read.Value.size() == read.ValueSize); +            TRope value = read.BuildRope(); +            if (intermediate->UsePayloadInResponse) { +                resultKv->SetPayloadId(intermediate->Payload.size()); +                intermediate->Payload.push_back(std::move(value)); +            } else { +                const TContiguousSpan span = value.GetContiguousSpan(); +                resultKv->SetValue(span.data(), span.size()); +            }          }          resultKv->SetValueSize(read.ValueSize);          resultKv->SetCreationUnixTime(read.CreationUnixTime); @@ -1037,7 +1052,7 @@ void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request,      record.Chain = {};      ui32 storage_channel = 0;      if (request.Status == NKikimrProto::SCHEDULED) { -        TRcBuf inlineData = request.Data; +        TRope inlineData = request.Data;          const size_t size = inlineData.size();          record.Chain.push_back(TIndexRecord::TChainItem(std::move(inlineData), 0));          CountWriteRecord(0, size); @@ -1177,7 +1192,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TConcat &request,          for (TIndexRecord::TChainItem& chainItem : input.Chain) {              if (chainItem.IsInline()) { -                chain.push_back(TIndexRecord::TChainItem(TRcBuf(chainItem.InlineData), offset)); +                chain.push_back(TIndexRecord::TChainItem(TRope(chainItem.InlineData), offset));              } else {                  const TLogoBlobID& id = chainItem.LogoBlobId;                  chain.push_back(TIndexRecord::TChainItem(id, offset)); @@ -2188,7 +2203,7 @@ void TKeyValueState::SplitIntoBlobs(TIntermediate::TWrite &cmd, bool isInline, u  }  bool TKeyValueState::PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest, -        THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info) { +        TEvKeyValue::TEvRequest& ev, THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info) {      intermediate->WriteIndices.reserve(kvRequest.CmdWriteSize());      for (ui32 i = 0; i < kvRequest.CmdWriteSize(); ++i) {          auto& request = kvRequest.GetCmdWrite(i); @@ -2203,10 +2218,10 @@ bool TKeyValueState::PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TK              ReplyError(ctx, str.Str(), NMsgBusProxy::MSTATUS_INTERNALERROR, NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, intermediate);              return true;          } -        if (!request.HasValue()) { +        if (request.GetDataCase() == NKikimrClient::TKeyValueRequest::TCmdWrite::DATA_NOT_SET) {              TStringStream str;              str << "KeyValue# " << TabletId; -            str << " Missing Value in CmdWrite(" << i << ") Marker# KV10"; +            str << " Missing Value/PayloadId in CmdWrite(" << i << ") Marker# KV10";              ReplyError(ctx, str.Str(), NMsgBusProxy::MSTATUS_INTERNALERROR, NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, intermediate);              return true;          } @@ -2249,7 +2264,20 @@ bool TKeyValueState::PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TK          }          interm.Key = request.GetKey(); -        interm.Data = TRcBuf(request.GetValue()); + +        switch (request.GetDataCase()) { +            case NKikimrClient::TKeyValueRequest::TCmdWrite::kValue: +                interm.Data = TRope(request.GetValue()); +                break; + +            case NKikimrClient::TKeyValueRequest::TCmdWrite::kPayloadId: +                interm.Data = ev.GetPayload(request.GetPayloadId()); +                break; + +            case NKikimrClient::TKeyValueRequest::TCmdWrite::DATA_NOT_SET: +                Y_UNREACHABLE(); +        } +          interm.Tactic = TEvBlobStorage::TEvPut::TacticDefault;          switch (request.GetTactic()) {              case NKikimrClient::TKeyValueRequest::MIN_LATENCY: @@ -2449,7 +2477,7 @@ TPrepareResult TKeyValueState::PrepareOneCmd(const TCommand::Write &request, THo      intermediate->Commands.emplace_back(TIntermediate::TWrite());      auto &cmd = std::get<TIntermediate::TWrite>(intermediate->Commands.back());      cmd.Key = request.key(); -    cmd.Data = TRcBuf(request.value()); +    cmd.Data = TRope(request.value());      switch (request.tactic()) {      case TCommand::Write::TACTIC_MIN_LATENCY:          cmd.Tactic = TEvBlobStorage::TEvPut::TacticMinLatency; @@ -3126,6 +3154,8 @@ bool TKeyValueState::PrepareIntermediate(TEvKeyValue::TEvRequest::TPtr &ev, THol      }      intermediate->HasIncrementGeneration = request.HasCmdIncrementGeneration(); +    intermediate->UsePayloadInResponse = request.GetUsePayloadInResponse(); +      if (CheckDeadline(ctx, request, intermediate)) {          return false;      } @@ -3152,7 +3182,7 @@ bool TKeyValueState::PrepareIntermediate(TEvKeyValue::TEvRequest::TPtr &ev, THol      error = error || PrepareCmdRename(ctx, request, intermediate);      error = error || PrepareCmdConcat(ctx, request, intermediate);      error = error || PrepareCmdDelete(ctx, request, intermediate); -    error = error || PrepareCmdWrite(ctx, request, intermediate, info); +    error = error || PrepareCmdWrite(ctx, request, *ev->Get(), intermediate, info);      error = error || PrepareCmdGetStatus(ctx, request, intermediate, info);      error = error || PrepareCmdTrimLeakedBlobs(ctx, request, intermediate, info);      error = error || PrepareCmdSetExecutorFastLogPolicy(ctx, request, intermediate, info); diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h index 901cd5d19ef..2f1afd766da 100644 --- a/ydb/core/keyvalue/keyvalue_state.h +++ b/ydb/core/keyvalue/keyvalue_state.h @@ -530,7 +530,7 @@ public:          THolder<TIntermediate> &intermediate);      bool PrepareCmdDelete(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest,          THolder<TIntermediate> &intermediate); -    bool PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest, +    bool PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest, TEvKeyValue::TEvRequest& ev,          THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info);      bool PrepareCmdGetStatus(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest,          THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info); diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp index 505716e6a23..c73420dd90a 100644 --- a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp +++ b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp @@ -3,6 +3,7 @@  #include <ydb/core/util/stlog.h>  #include <library/cpp/actors/protos/services_common.pb.h> +#include <util/generic/overloaded.h>  namespace NKikimr { @@ -62,16 +63,6 @@ public:          return std::holds_alternative<TIntermediate::TRead>(GetCommand());      } -    bool IsRangeRead() const { -        return std::holds_alternative<TIntermediate::TRangeRead>(GetCommand()); -    } - -    void AddRead(TIntermediate::TRead &read) { -        for (auto &readItem : read.ReadItems) { -            ReadItems.push_back({&read, &readItem}); -        } -    } -      NKikimrBlobStorage::EGetHandleClass GetHandleClass() const {          auto visitor = [&] (auto &request) {              return request.HandleClass; @@ -99,19 +90,22 @@ public:          }          ui32 readCount = 0; -        auto addReadItems = [&](auto &request) { -            using Type = std::decay_t<decltype(request)>; -            if constexpr (std::is_same_v<Type, TIntermediate::TRead>) { -                AddRead(request); -                readCount++; -            } else { -                for (auto &read : request.Reads) { -                    AddRead(read); -                    readCount++; -                } +        auto addRead = [&](TIntermediate::TRead& read) { +            for (auto& readItem : read.ReadItems) { +                ReadItems.push_back({&read, &readItem});              } +            ++readCount;          }; -        std::visit(addReadItems, GetCommand()); +        std::visit(TOverloaded{ +            [&](TIntermediate::TRead& read) { +                addRead(read); +            }, +            [&](TIntermediate::TRangeRead& rangeRead) { +                for (auto& read : rangeRead.Reads) { +                    addRead(read); +                } +            } +        }, GetCommand());          if (ReadItems.empty()) {              auto getStatus = [&](auto &request) { @@ -284,9 +278,6 @@ public:              read.Status = response.Status;              if (response.Status == NKikimrProto::OK) { -                if (read.Value.size() != read.ValueSize) { -                    read.Value.resize(read.ValueSize); -                }                  Y_VERIFY_S(response.Buffer.size() == readItem.BlobSize,                          "response.Buffer.size()# " << response.Buffer.size()                          << " readItem.BlobSize# " << readItem.BlobSize); @@ -294,11 +285,9 @@ public:                          "readItem.ValueOffset# " << readItem.ValueOffset                          << " readItem.BlobSize# " << readItem.BlobSize                          << " read.ValueSize# " << read.ValueSize); -                Y_VERIFY(read.Value.IsDetached()); -                response.Buffer.begin().ExtractPlainDataAndAdvance(read.Value.Detach() + readItem.ValueOffset, response.Buffer.size());                  IntermediateResult->Stat.GroupReadBytes[std::make_pair(response.Id.Channel(), batch.GroupId)] += response.Buffer.size(); -                // FIXME: count distinct blobs?" keyvalue_storage_request.cpp:279                  IntermediateResult->Stat.GroupReadIops[std::make_pair(response.Id.Channel(), batch.GroupId)] += 1; +                read.Value.Write(readItem.ValueOffset, std::move(response.Buffer));              } else {                  Y_VERIFY_DEBUG_S(response.Status != NKikimrProto::NODATA, "NODATA received for TEvGet"                      << " TabletId# " << TabletInfo->TabletID @@ -414,7 +403,10 @@ public:          response->Record.set_requested_key(interRead.Key);          response->Record.set_requested_offset(interRead.Offset);          response->Record.set_requested_size(interRead.RequestedSize); -        response->Record.set_value(interRead.Value); + +        TRope value = interRead.BuildRope(); +        const TContiguousSpan span = value.GetContiguousSpan(); +        response->Record.set_value(span.data(), span.size());          if (IntermediateResult->RespondTo.NodeId() != SelfId().NodeId()) {              response->Record.set_node_id(SelfId().NodeId()); @@ -455,7 +447,11 @@ public:          for (auto &interRead : interRange.Reads) {              auto *kvp = readRangeResult.add_pair();              kvp->set_key(interRead.Key); -            kvp->set_value(interRead.Value); + +            TRope value = interRead.BuildRope(); +            const TContiguousSpan span = value.GetContiguousSpan(); +            kvp->set_value(span.data(), span.size()); +              kvp->set_value_size(interRead.ValueSize);              kvp->set_creation_unix_time(interRead.CreationUnixTime);              ui32 storageChannel = MainStorageChannelInPublicApi; diff --git a/ydb/core/keyvalue/keyvalue_storage_request.cpp b/ydb/core/keyvalue/keyvalue_storage_request.cpp index 84005ead266..b5e546a1df0 100644 --- a/ydb/core/keyvalue/keyvalue_storage_request.cpp +++ b/ydb/core/keyvalue/keyvalue_storage_request.cpp @@ -285,15 +285,11 @@ public:              auto& readItem = *it->ReadItem;              if (response.Status == NKikimrProto::OK) { -                if (read.Value.size() != read.ValueSize) { -                    read.Value.resize(read.ValueSize); -                }                  Y_VERIFY(response.Buffer.size() == readItem.BlobSize);                  Y_VERIFY(readItem.ValueOffset + readItem.BlobSize <= read.ValueSize); -                Y_VERIFY(read.Value.IsDetached()); -                response.Buffer.begin().ExtractPlainDataAndAdvance(read.Value.Detach() + readItem.ValueOffset, response.Buffer.size());                  IntermediateResults->Stat.GroupReadBytes[std::make_pair(response.Id.Channel(), groupId)] += response.Buffer.size();                  IntermediateResults->Stat.GroupReadIops[std::make_pair(response.Id.Channel(), groupId)] += 1; // FIXME: count distinct blobs? +                read.Value.Write(readItem.ValueOffset, std::move(response.Buffer));              } else {                  TStringStream err;                  if (read.Message.size()) { @@ -633,15 +629,15 @@ public:                  if (request.Status != NKikimrProto::SCHEDULED) {                      Y_VERIFY(request.Status == NKikimrProto::UNKNOWN); -                    const TRcBuf& data = request.Data; -                    const TContiguousSpan whole = data.GetContiguousSpan(); +                    const TRope& data = request.Data; +                    auto iter = data.begin(); -                    ui64 offset = 0;                      for (const TLogoBlobID& logoBlobId : request.LogoBlobIds) { -                        const TContiguousSpan chunk = whole.SubSpan(offset, logoBlobId.BlobSize()); +                        const auto begin = iter; +                        iter += logoBlobId.BlobSize();                          THolder<TEvBlobStorage::TEvPut> put(                              new TEvBlobStorage::TEvPut( -                                logoBlobId, TRcBuf(TRcBuf::Piece, chunk.data(), chunk.size(), data), +                                logoBlobId, TRcBuf(TRope(begin, iter)),                                  IntermediateResults->Deadline, request.HandleClass,                                  request.Tactic));                          const ui32 groupId = TabletInfo->GroupFor(logoBlobId.Channel(), logoBlobId.Generation()); @@ -654,7 +650,6 @@ public:                          SendPutToGroup(ctx, groupId, TabletInfo.Get(), std::move(put), i);                          ++WriteRequestsSent; -                        offset += logoBlobId.BlobSize();                      }                  }              } diff --git a/ydb/core/protos/msgbus_kv.proto b/ydb/core/protos/msgbus_kv.proto index 87c65e51764..c8317d9d4c0 100644 --- a/ydb/core/protos/msgbus_kv.proto +++ b/ydb/core/protos/msgbus_kv.proto @@ -52,7 +52,10 @@ message TKeyValueRequest {      }      message TCmdWrite {          optional bytes Key = 1; // mandatory -        optional bytes Value = 2; // mandatory +        oneof Data { // mandatory +            bytes Value = 2; +            uint32 PayloadId = 8; +        }          optional EStorageChannel StorageChannel = 3; // (default = MAIN)          optional EPriority Priority = 4; // (default = REALTIME)          optional bytes KeyToCache = 5; // used in PQ @@ -86,6 +89,7 @@ message TKeyValueRequest {      optional uint64 TabletId = 1; // mandatory      optional uint64 Generation = 2; // optional, no generation check is done if missing      optional uint64 Cookie = 3; +    optional bool UsePayloadInResponse = 16; // use PayloadId instead of Value in TKeyValueResponse      repeated TCmdDeleteRange CmdDeleteRange = 4;      optional TCmdIncrementGeneration CmdIncrementGeneration = 5; // conflicts with any other Cmd, must be the only one      repeated TCmdRead CmdRead = 6; @@ -108,7 +112,10 @@ message TKeyValueResponse {      message TKeyValuePair {          optional bytes Key = 1; -        optional bytes Value = 2; +        oneof Data { +            bytes Value = 2; +            uint32 PayloadId = 7; +        }          optional uint32 ValueSize = 3;          optional uint64 CreationUnixTime = 4;          optional TKeyValueRequest.EStorageChannel StorageChannel = 5; // Returns the _actual_ storage channel @@ -124,7 +131,10 @@ message TKeyValueResponse {      }      message TReadResult {          optional uint32 Status = 1; // EReplyStatus from ydb/core/protos/base.proto -        optional bytes Value = 2; +        oneof Data { +            bytes Value = 2; +            uint32 PayloadId = 4; +        }          optional string Message = 3;      }      message TReadRangeResult { | 
