diff options
author | alexvru <alexvru@ydb.tech> | 2023-07-12 11:13:51 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-07-12 11:13:51 +0300 |
commit | 805223580f2054e7d21ff57cb6119d992c6e1af9 (patch) | |
tree | d0c03c9b74e5bc56d5be7597cb38bd67805972ef | |
parent | f15a5a2053e3c943171d08dc844a6117b1716c2d (diff) | |
download | ydb-805223580f2054e7d21ff57cb6119d992c6e1af9.tar.gz |
Switch to TRope reply in TEvGet query result KIKIMR-18394
48 files changed, 205 insertions, 181 deletions
diff --git a/library/cpp/actors/util/rc_buf.h b/library/cpp/actors/util/rc_buf.h index 60d0ef904a1..11a09f5b9ad 100644 --- a/library/cpp/actors/util/rc_buf.h +++ b/library/cpp/actors/util/rc_buf.h @@ -446,14 +446,12 @@ class TRcBuf { } return Visit(Owner, [](EType, auto& value) -> TContiguousSpan { using T = std::decay_t<decltype(value)>; - if constexpr (std::is_same_v<T, TString>) { - return {&(*value.cbegin()), value.size()}; - } else if constexpr (std::is_same_v<T, NActors::TSharedData> || std::is_same_v<T, TInternalBackend>) { + if constexpr (std::is_same_v<T, TString> || std::is_same_v<T, NActors::TSharedData> || std::is_same_v<T, TInternalBackend>) { return {value.data(), value.size()}; } else if constexpr (std::is_same_v<T, IContiguousChunk::TPtr>) { return value->GetData(); } else { - return {}; + static_assert(TDependentFalse<T>, "unexpected type"); } }); } @@ -479,7 +477,7 @@ class TRcBuf { } else if constexpr (std::is_same_v<T, IContiguousChunk::TPtr>) { return value->GetDataMut(); } else { - return {}; + static_assert(TDependentFalse<T>, "unexpected type"); } }); } @@ -497,7 +495,7 @@ class TRcBuf { } else if constexpr (std::is_same_v<T, IContiguousChunk::TPtr>) { return value->UnsafeGetDataMut(); } else { - return {}; + static_assert(TDependentFalse<T>, "unexpected type"); } }); } @@ -515,7 +513,7 @@ class TRcBuf { } else if constexpr (std::is_same_v<T, IContiguousChunk::TPtr>) { return value->GetOccupiedMemorySize(); } else { - Y_FAIL(); + static_assert(TDependentFalse<T>, "unexpected type"); } }); } @@ -582,28 +580,26 @@ class TRcBuf { return false; } - template <class TResult> - TResult GetRaw() const { + template <typename TResult, typename TCallback> + std::invoke_result_t<TCallback, const TResult*> ApplySpecificValue(TCallback&& callback) const { + static_assert(std::is_same_v<TResult, TString> || + std::is_same_v<TResult, NActors::TSharedData> || + std::is_same_v<TResult, TInternalBackend> || + std::is_same_v<TResult, IContiguousChunk::TPtr>); + if (!Owner) { - return TResult{}; + return callback(nullptr); } - return Visit(Owner, [](EType, auto& value) { + return Visit(Owner, [&](EType, auto& value) { using T = std::decay_t<decltype(value)>; if constexpr (std::is_same_v<T, TResult>) { - return value; + return callback(&value); } else { - Y_FAIL(); - return TResult{}; // unreachable + return callback(nullptr); } }); } - NActors::TSharedData GetRawTrimmed(size_t size) const { - NActors::TSharedData result = GetRaw<NActors::TSharedData>(); - result.TrimBack(size); - return result; - } - explicit operator bool() const { return static_cast<bool>(Owner); } @@ -845,28 +841,6 @@ public: return Backend.ContainsNativeType<TType>(); } - template <class TResult> - TResult GetRaw() const { - return Backend.GetRaw<TResult>(); - } - - NActors::TSharedData GetRawTrimmed(size_t size) const { - return Backend.GetRawTrimmed(size); - } - - bool ReferencesWholeContainer() const { - return Backend.GetData().size() == GetSize(); - } - - - bool ReferencesTrimableToWholeContainer() const { - if (ContainsNativeType<NActors::TSharedData>()) { - return Backend.GetData().size() == (GetSize() + UnsafeTailroom()); - } else { - return ReferencesWholeContainer(); - } - } - bool CanGrowFront() const noexcept { return Backend.CanGrowFront(Begin); } @@ -913,18 +887,30 @@ public: template <class TResult> TResult ExtractUnderlyingContainerOrCopy() const { - if (ContainsNativeType<TResult>() && (ReferencesWholeContainer() || ReferencesTrimableToWholeContainer())) { - using T = std::decay_t<TResult>; - if constexpr (std::is_same_v<T, NActors::TSharedData>) { - return GetRawTrimmed(GetSize()); - } else { - return GetRaw<TResult>(); + static_assert(std::is_same_v<TResult, TString> || + std::is_same_v<TResult, NActors::TSharedData> || + std::is_same_v<TResult, TInternalBackend>); + + constexpr bool isSharedData = std::is_same_v<TResult, NActors::TSharedData>; + TResult res; + + const bool found = Backend.ApplySpecificValue<TResult>([&](const TResult *raw) { + if (raw && raw->data() == Begin && (isSharedData ? End <= Begin + raw->size() : End == Begin + raw->size())) { + if constexpr (isSharedData) { + raw->TrimBack(size()); + } + res = TResult(*raw); + return true; } + return false; + }); + + if (!found) { + res = TResult::Uninitialized(GetSize()); + char* data = NContiguousDataDetails::TContainerTraits<TResult>::UnsafeGetDataMut(res); + std::memcpy(data, GetData(), GetSize()); } - TResult res = TResult::Uninitialized(GetSize()); - char* data = NContiguousDataDetails::TContainerTraits<TResult>::UnsafeGetDataMut(res); - std::memcpy(data, Begin, End - Begin); return res; } @@ -932,7 +918,7 @@ public: return {GetData(), GetSize()}; } - TStringBuf Slice(size_t pos = 0, size_t len = -1) const noexcept { + TStringBuf Slice(size_t pos = 0, size_t len = Max<size_t>()) const noexcept { pos = Min(pos, size()); len = Min(len, size() - pos); return {const_cast<TRcBuf*>(this)->UnsafeGetDataMut() + pos, len}; diff --git a/library/cpp/actors/util/rope.h b/library/cpp/actors/util/rope.h index 3c655881814..9df251db9ce 100644 --- a/library/cpp/actors/util/rope.h +++ b/library/cpp/actors/util/rope.h @@ -455,6 +455,10 @@ public: return !Size; } + bool empty() const { + return IsEmpty(); + } + operator bool() const { return Chain; } @@ -699,10 +703,7 @@ public: // Use this method carefully -- it may significantly reduce performance when misused. TString ConvertToString() const { - // TODO(innokentii): could be microoptimized for single TString case - TString res = TString::Uninitialized(GetSize()); - Begin().ExtractPlainDataAndAdvance(res.Detach(), res.size()); - return res; + return ExtractUnderlyingContainerOrCopy<TString>(); } /** @@ -710,14 +711,14 @@ public: */ template <class TResult> TResult ExtractUnderlyingContainerOrCopy() const { - if (IsContiguous() && GetSize() != 0) { - const auto& chunk = Begin().GetChunk(); - return chunk.ExtractUnderlyingContainerOrCopy<TResult>(); + if (Chain.begin() != Chain.end() && ++Chain.begin() == Chain.end()) { + return Chain.GetFirstChunk().ExtractUnderlyingContainerOrCopy<TResult>(); } - TResult res = TResult::Uninitialized(GetSize()); + const size_t size = GetSize(); + TResult res = TResult::Uninitialized(size); char* data = NContiguousDataDetails::TContainerTraits<TResult>::UnsafeGetDataMut(res); - Begin().ExtractPlainDataAndAdvance(data, res.size()); + Begin().ExtractPlainDataAndAdvance(data, size); return res; } diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index b9ccac76867..b8ff78474bc 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -1194,7 +1194,7 @@ struct TEvBlobStorage { TLogoBlobID Id; ui32 Shift; ui32 RequestedSize; - TString Buffer; + TRope Buffer; TVector<TPartMapItem> PartMap; bool Keep = false; bool DoNotKeep = false; @@ -1245,7 +1245,7 @@ struct TEvBlobStorage { str << " RequestedSize# " << response.RequestedSize; } if (isFull) { - str << " Buffer# " << response.Buffer.Quote(); + str << " Buffer# " << response.Buffer.ConvertToString().Quote(); } str << "}"; if (ErrorReason.size()) { diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index 1620172c254..7509890e00c 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -149,6 +149,34 @@ namespace NKikimr::NBlobDepot { void RegisterRequestInFlight(TRequestInFlight *requestInFlight); }; + struct TReadOutcome { + struct TOk { + TRope Data; + }; + struct TNodata { + }; + struct TError { + NKikimrProto::EReplyStatus Status; + TString ErrorReason; + }; + std::variant<TOk, TNodata, TError> Value; + + TString ToString() const { + return std::visit(TOverloaded{ + [](const TOk& value) { + return TStringBuilder() << "{Ok Data.size# " << value.Data.size() << "}"; + }, + [](const TNodata& /*value*/) { + return TStringBuilder() << "{Nodata}"; + }, + [](const TError& value) { + return TStringBuilder() << "{Error Status# " << NKikimrProto::EReplyStatus_Name(value.Status) + << " ErrorReason# " << value.ErrorReason.Quote() << "}"; + } + }, Value); + } + }; + class TBlobDepotAgent : public TActorBootstrapped<TBlobDepotAgent> , public TRequestSender @@ -342,7 +370,7 @@ namespace NKikimr::NBlobDepot { virtual void Initiate() = 0; virtual void OnUpdateBlock() {} - virtual void OnRead(ui64 /*tag*/, NKikimrProto::EReplyStatus /*status*/, TString /*dataOrErrorReason*/) {} + virtual void OnRead(ui64 /*tag*/, TReadOutcome&& /*outcome*/) {} virtual void OnIdAllocated(bool /*success*/) {} virtual void OnDestroy(bool /*success*/) {} diff --git a/ydb/core/blob_depot/agent/defs.h b/ydb/core/blob_depot/agent/defs.h index 09f2cba7af1..774204b5915 100644 --- a/ydb/core/blob_depot/agent/defs.h +++ b/ydb/core/blob_depot/agent/defs.h @@ -6,6 +6,7 @@ #include <ydb/core/blob_depot/types.h> #include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> +#include <ydb/core/util/fragmented_buffer.h> #include <ydb/core/util/stlog.h> #include <util/generic/hash_multi_map.h> diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp index f577a74bfb7..e3c793a1ed8 100644 --- a/ydb/core/blob_depot/agent/read.cpp +++ b/ydb/core/blob_depot/agent/read.cpp @@ -8,7 +8,7 @@ namespace NKikimr::NBlobDepot { { TReadArg ReadArg; const ui64 Size; - TString Buffer; + TFragmentedBuffer Buffer; bool Terminated = false; bool StopProcessingParts = false; ui32 NumPartsPending = 0; @@ -25,20 +25,22 @@ namespace NKikimr::NBlobDepot { void EndWithSuccess(TQuery *query) { Y_VERIFY(!Terminated); - query->OnRead(ReadArg.Tag, NKikimrProto::OK, std::move(Buffer)); + Y_VERIFY(Buffer.IsMonolith()); + Y_VERIFY(Buffer.GetMonolith().size() == Size); + query->OnRead(ReadArg.Tag, TReadOutcome{TReadOutcome::TOk{Buffer.GetMonolith()}}); Abort(); } void EndWithError(TQuery *query, NKikimrProto::EReplyStatus status, TString errorReason) { Y_VERIFY(!Terminated); Y_VERIFY(status != NKikimrProto::NODATA && status != NKikimrProto::OK); - query->OnRead(ReadArg.Tag, status, errorReason); + query->OnRead(ReadArg.Tag, TReadOutcome{TReadOutcome::TError{status, std::move(errorReason)}}); Abort(); } void EndWithNoData(TQuery *query) { Y_VERIFY(!Terminated); - query->OnRead(ReadArg.Tag, NKikimrProto::NODATA, {}); + query->OnRead(ReadArg.Tag, TReadOutcome{TReadOutcome::TNodata{}}); Abort(); } @@ -188,20 +190,9 @@ namespace NKikimr::NBlobDepot { return readContext.EndWithError(this, blob.Status, TStringBuilder() << "failed to read BlobId# " << blob.Id); } - auto& buffer = readContext.Buffer; const ui64 offset = partContext.Offsets[i]; - Y_VERIFY(offset < readContext.Size && blob.Buffer.size() <= readContext.Size - offset); - - if (!buffer && !offset) { - buffer = std::move(blob.Buffer); - buffer.resize(readContext.Size); - } else { - if (!buffer) { - buffer = TString::Uninitialized(readContext.Size); - } - memcpy(buffer.Detach() + offset, blob.Buffer.data(), blob.Buffer.size()); - } + readContext.Buffer.Write(offset, std::move(blob.Buffer)); } if (!--readContext.NumPartsPending) { diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp index da5deaf4598..9e5c29c97b8 100644 --- a/ydb/core/blob_depot/agent/storage_discover.cpp +++ b/ydb/core/blob_depot/agent/storage_discover.cpp @@ -146,31 +146,28 @@ namespace NKikimr::NBlobDepot { } } - void OnRead(ui64 /*tag*/, NKikimrProto::EReplyStatus status, TString dataOrErrorReason) override { + void OnRead(ui64 /*tag*/, TReadOutcome&& outcome) override { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA20, "OnRead", (AgentId, Agent.LogId), - (QueryId, GetQueryId()), (Status, status)); + (QueryId, GetQueryId()), (Outcome, outcome)); - switch (status) { - case NKikimrProto::OK: - Buffer = std::move(dataOrErrorReason); + std::visit(TOverloaded{ + [&](TReadOutcome::TOk& ok) { + Buffer = ok.Data.ConvertToString(); DoneWithData = true; CheckIfDone(); - break; - - case NKikimrProto::NODATA: { + }, + [&](TReadOutcome::TNodata& /*nodata*/) { // we are reading blob from the original group and it may be partially written -- it is totally // okay to have some; we need to advance to the next readable blob auto *range = Resolve.MutableItems(0)->MutableKeyRange(); range->SetEndingKey(Id.AsBinaryString()); range->ClearIncludeEnding(); IssueResolve(); - break; + }, + [&](TReadOutcome::TError& error) { + EndWithError(error.Status, error.ErrorReason); } - - default: - EndWithError(status, std::move(dataOrErrorReason)); - break; - } + }, outcome.Value); } void CheckIfDone() { diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp index e09c1be28e5..300e9d50dcc 100644 --- a/ydb/core/blob_depot/agent/storage_get.cpp +++ b/ydb/core/blob_depot/agent/storage_get.cpp @@ -102,17 +102,28 @@ namespace NKikimr::NBlobDepot { return true; } - void OnRead(ui64 tag, NKikimrProto::EReplyStatus status, TString buffer) override { + void OnRead(ui64 tag, TReadOutcome&& outcome) override { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA35, "OnRead", (AgentId, Agent.LogId), (QueryId, GetQueryId()), - (Tag, tag), (Status, status), (Buffer.size, status == NKikimrProto::OK ? buffer.size() : 0), - (ErrorReason, status != NKikimrProto::OK ? buffer : "")); + (Tag, tag), (Outcome, outcome)); auto& resp = Response->Responses[tag]; Y_VERIFY(resp.Status == NKikimrProto::UNKNOWN); - resp.Status = status; - if (status == NKikimrProto::OK) { - resp.Buffer = std::move(buffer); - } + std::visit(TOverloaded{ + [&](TReadOutcome::TOk& ok) { + resp.Status = NKikimrProto::OK; + resp.Buffer = std::move(ok.Data); + }, + [&](TReadOutcome::TNodata& /*nodata*/) { + resp.Status = NKikimrProto::NODATA; + }, + [&](TReadOutcome::TError& error) { + resp.Status = error.Status; + if (Response->ErrorReason) { + Response->ErrorReason += ", "; + } + Response->ErrorReason += error.ErrorReason; + } + }, outcome.Value); --AnswersRemain; CheckAndFinish(); } diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp index c4a736ca96c..b3072aa500c 100644 --- a/ydb/core/blob_depot/agent/storage_range.cpp +++ b/ydb/core/blob_depot/agent/storage_range.cpp @@ -114,32 +114,35 @@ namespace NKikimr::NBlobDepot { } } - void OnRead(ui64 tag, NKikimrProto::EReplyStatus status, TString dataOrErrorReason) override { + void OnRead(ui64 tag, TReadOutcome&& outcome) override { --ReadsInFlight; Y_VERIFY(tag < Reads.size()); TRead& read = Reads[tag]; - switch (status) { - case NKikimrProto::OK: { - Y_VERIFY(dataOrErrorReason.size() == read.Id.BlobSize()); - const bool inserted = FoundBlobs.try_emplace(read.Id, std::move(dataOrErrorReason)).second; + const bool success = std::visit(TOverloaded{ + [&](TReadOutcome::TOk& ok) { + Y_VERIFY(ok.Data.size() == read.Id.BlobSize()); + const bool inserted = FoundBlobs.try_emplace(read.Id, ok.Data.ConvertToString()).second; Y_VERIFY_S(inserted, "AgentId# " << Agent.LogId << " QueryId# " << GetQueryId() << " duplicate BlobId# " << read.Id << " received"); - break; - } - - case NKikimrProto::NODATA: + return true; + }, + [&](TReadOutcome::TNodata& /*nodata*/) { // this blob has just vanished since we found it in index -- may be it was partially written and // now gone; it's okay to have this situation, not a data loss - break; + return true; + }, + [&](TReadOutcome::TError& error) { + EndWithError(error.Status, TStringBuilder() << "failed to retrieve BlobId# " + << read.Id << " Error# " << error.ErrorReason); + return false; + } + }, outcome.Value); - default: - return EndWithError(status, TStringBuilder() << "failed to retrieve BlobId# " - << read.Id << " Error# " << dataOrErrorReason); + if (success) { + CheckAndFinish(); } - - CheckAndFinish(); } void CheckAndFinish() { diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp index dd9d4dd5df3..ce05524e84f 100644 --- a/ydb/core/blob_depot/assimilator.cpp +++ b/ydb/core/blob_depot/assimilator.cpp @@ -378,7 +378,7 @@ namespace NKikimr::NBlobDepot { const auto blobSeqId = TBlobSeqId::FromSequentalNumber(channel.Index, Self->Executor()->Generation(), value); const TLogoBlobID id = blobSeqId.MakeBlobId(Self->TabletID(), EBlobType::VG_DATA_BLOB, 0, resp.Id.BlobSize()); const ui64 putId = NextPutId++; - SendToBSProxy(SelfId(), channel.GroupId, new TEvBlobStorage::TEvPut(id, resp.Buffer, TInstant::Max()), putId); + SendToBSProxy(SelfId(), channel.GroupId, new TEvBlobStorage::TEvPut(id, TRcBuf(resp.Buffer), TInstant::Max()), putId); const bool inserted = channel.AssimilatedBlobsInFlight.insert(value).second; // prevent from barrier advancing Y_VERIFY(inserted); const bool inserted1 = PutIdToKey.try_emplace(putId, TData::TKey(resp.Id), it->first).second; diff --git a/ydb/core/blob_depot/data_decommit.cpp b/ydb/core/blob_depot/data_decommit.cpp index c1dda2ef7f1..050845c4c95 100644 --- a/ydb/core/blob_depot/data_decommit.cpp +++ b/ydb/core/blob_depot/data_decommit.cpp @@ -245,7 +245,7 @@ namespace NKikimr::NBlobDepot { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // PUT QUERIES are used to store retrieved MustRestoreFirst blobs in local storage - void IssuePut(TKey key, TString&& buffer, bool keep, bool doNotKeep) { + void IssuePut(TKey key, TRope&& buffer, bool keep, bool doNotKeep) { std::vector<ui8> channels(1); if (Self->PickChannels(NKikimrBlobDepot::TChannelKind::Data, channels)) { TChannelInfo& channel = Self->Channels[channels.front()]; @@ -254,7 +254,7 @@ namespace NKikimr::NBlobDepot { const TLogoBlobID id = blobSeqId.MakeBlobId(Self->TabletID(), EBlobType::VG_DATA_BLOB, 0, buffer.size()); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT91, "going to TEvPut", (Id, Self->GetLogId()), (Sender, Ev->Sender), (Cookie, Ev->Cookie), (Key, key), (BlobId, id)); - SendToBSProxy(SelfId(), channel.GroupId, new TEvBlobStorage::TEvPut(id, std::move(buffer), TInstant::Max()), + SendToBSProxy(SelfId(), channel.GroupId, new TEvBlobStorage::TEvPut(id, TRcBuf(buffer), TInstant::Max()), (ui64)keep | (ui64)doNotKeep << 1); const bool inserted = channel.AssimilatedBlobsInFlight.insert(value).second; // prevent from barrier advancing Y_VERIFY(inserted); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h index 6d06f68f795..091bd8df5dc 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy.h @@ -596,7 +596,7 @@ void EncryptInplace(TRope& rope, ui32 offset, ui32 size, const TLogoBlobID& id, void Decrypt(char *destination, const char *source, size_t shift, size_t sizeBytes, const TLogoBlobID &id, const TBlobStorageGroupInfo &info); -void DecryptInplace(TRope& rope, const TLogoBlobID& id, const TBlobStorageGroupInfo& info); +void DecryptInplace(TRope& rope, ui32 offset, ui32 shift, ui32 size, const TLogoBlobID& id, const TBlobStorageGroupInfo& info); IActor* CreateBlobStorageGroupRangeRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp index ca296d76ad3..0f97eae7b36 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp @@ -748,14 +748,14 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB if (IsGetBlockDone) { std::unique_ptr<TEvBlobStorage::TEvDiscoverResult> result( new TEvBlobStorage::TEvDiscoverResult( - response.Id, MinGeneration, response.Buffer, BlockedGen)); + response.Id, MinGeneration, response.Buffer.ConvertToString(), BlockedGen)); A_LOG_DEBUG_S("BSD16", "Handle TEvGetResult status# OK Die. TEvDiscoverResult# " << result->Print(false)); SendResult(result); return; } PendingResult.reset(new TEvBlobStorage::TEvDiscoverResult(response.Id, MinGeneration, - response.Buffer, BlockedGen)); + response.Buffer.ConvertToString(), BlockedGen)); A_LOG_DEBUG_S("BSD17", "Handle TEvGetResult status# OK" << " Setting pending result# " << PendingResult->ToString()); Y_VERIFY(TotalRecieved < TotalSent); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp index 31ceba6c386..e85995a2f3e 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp @@ -611,7 +611,7 @@ public: case NKikimrProto::OK: // okay response -- blob is read and stored in all replicas Y_VERIFY(resp.Id == ResultBlobId); - Buffer = resp.Buffer; + Buffer = resp.Buffer.ConvertToString(); GetFinished = true; TryToSatisfyRequest(); break; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp index 4e7ff9c2ce7..40a3bf4431c 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp @@ -338,8 +338,8 @@ public: break; case NKikimrProto::OK: - SendResponseAndDie(std::make_unique<TEvBlobStorage::TEvDiscoverResult>(resp.Id, MinGeneration, resp.Buffer, - BlockedGeneration)); + SendResponseAndDie(std::make_unique<TEvBlobStorage::TEvDiscoverResult>(resp.Id, MinGeneration, + resp.Buffer.ConvertToString(), BlockedGeneration)); break; default: diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_encrypt.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_encrypt.cpp index 62cf2a90fff..06d43748bbb 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_encrypt.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_encrypt.cpp @@ -70,12 +70,13 @@ namespace NKikimr { Encrypt(destination, source, shift, sizeBytes, id, info); } - void DecryptInplace(TRope& rope, const TLogoBlobID& id, const TBlobStorageGroupInfo& info) { + void DecryptInplace(TRope& rope, ui32 offset, ui32 shift, ui32 size, const TLogoBlobID& id, const TBlobStorageGroupInfo& info) { if (info.GetEncryptionMode() == TBlobStorageGroupInfo::EEM_NONE) { return; } auto span = rope.GetContiguousSpanMut(); - Decrypt(span.data(), span.data(), 0, span.size(), id, info); + Y_VERIFY(offset < span.size() && size <= span.size() - offset); + Decrypt(span.data() + offset, span.data() + offset, shift, size, id, info); } } // NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp index ca277ffb8b5..40dcb64808d 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp @@ -133,10 +133,8 @@ void TGetImpl::PrepareReply(NKikimrProto::EReplyStatus status, TString errorReas ui32 size = query.Size ? Min(query.Size, query.Id.BlobSize() - shift) : query.Id.BlobSize() - shift; if (!PhantomCheck) { - TString data = TString::Uninitialized(size); - char *dataBuffer = data.Detach(); - blobState.Whole.Data.Read(shift, dataBuffer, size); - Decrypt(dataBuffer, dataBuffer, shift, size, query.Id, *Info); + TRope data = blobState.Whole.Data.Read(shift, size); + DecryptInplace(data, 0, shift, size, query.Id, *Info); outResponse.Buffer = std::move(data); Y_VERIFY(outResponse.Buffer, "%s empty response buffer", RequestPrefix.data()); ReplyBytes += outResponse.Buffer.size(); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp index 8950f18d7d3..01621b1fb3e 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp @@ -187,7 +187,7 @@ public: return; } - Buffer = result->Responses[0].Buffer; + Buffer = result->Responses[0].Buffer.ConvertToString(); ApplyDiffs(); std::unique_ptr<TEvBlobStorage::TEvPut> put = std::make_unique<TEvBlobStorage::TEvPut>(PatchedId, Buffer, Deadline, diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp index 1b7d4343712..b5ff53a6b82 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp @@ -389,7 +389,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt } TEvBlobStorage::TEvPut *put; - DecryptInplace(item.Buffer, item.BlobId, *Info); + DecryptInplace(item.Buffer, 0, 0, item.Buffer.size(), item.BlobId, *Info); ev->Bunch.emplace_back(new IEventHandle( TActorId() /*recipient*/, diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp index d1783af11f8..a3925bf52c1 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp @@ -283,7 +283,7 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob Y_VERIFY(response.Id == BlobsToGet[i].BlobId); if (getResult.Responses[i].Status == NKikimrProto::OK) { - result->Responses.emplace_back(response.Id, std::move(response.Buffer), response.Keep, response.DoNotKeep); + result->Responses.emplace_back(response.Id, response.Buffer.ConvertToString(), response.Keep, response.DoNotKeep); } else if (getResult.Responses[i].Status != NKikimrProto::NODATA || BlobsToGet[i].RequiredToBePresent) { // it's okay to get NODATA if blob wasn't confirmed -- this blob is simply thrown out of resulting // set; otherwise we return error about lost data diff --git a/ydb/core/blobstorage/dsproxy/mock/model.h b/ydb/core/blobstorage/dsproxy/mock/model.h index ead4f90bb6b..608b95bfafd 100644 --- a/ydb/core/blobstorage/dsproxy/mock/model.h +++ b/ydb/core/blobstorage/dsproxy/mock/model.h @@ -129,7 +129,8 @@ namespace NFake { const ui32 size = Min<ui32>(maxSize, !query.Size ? Max<ui32>() : query.Size); // calculate substring; use 0 instead of query.Shift because it may exceed the buffer - response.Buffer = data.Buffer.ConvertToString().substr(size ? query.Shift : 0, size); + const ui32 offset = size ? query.Shift : 0; + response.Buffer = TRope(data.Buffer.Position(offset), data.Buffer.Position(offset + size)); } else { // ensure this blob is not under GC Y_VERIFY(!IsCollectedByBarrier(id), "Id# %s", id.ToString().data()); diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_get_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_get_ut.cpp index 59f0c116c3c..638f4bc5975 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_get_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_get_ut.cpp @@ -120,7 +120,7 @@ void TestIntervalsAndCrcAllOk(TErasureType::EErasureSpecies erasureSpecies, bool UNIT_ASSERT_VALUES_EQUAL(a.Status, NKikimrProto::OK); UNIT_ASSERT_VALUES_EQUAL(q.Shift, a.Shift); UNIT_ASSERT_VALUES_EQUAL(q.Size, a.RequestedSize); - blobSet.Check(queryIdx % blobCount, q.Id, q.Shift, q.Size, a.Buffer); + blobSet.Check(queryIdx % blobCount, q.Id, q.Shift, q.Size, a.Buffer.ConvertToString()); } } } @@ -451,7 +451,7 @@ private: UNIT_ASSERT_VALUES_EQUAL(a.Status, NKikimrProto::OK); UNIT_ASSERT_VALUES_EQUAL(q.Shift, a.Shift); UNIT_ASSERT_VALUES_EQUAL(q.Size, a.RequestedSize); - BlobSet->Check(queryIdx % blobCount, q.Id, q.Shift, q.Size, a.Buffer); + BlobSet->Check(queryIdx % blobCount, q.Id, q.Shift, q.Size, a.Buffer.ConvertToString()); } } }; @@ -765,7 +765,7 @@ Y_UNIT_TEST(TestBlock42VGetCountWithErasure) { if (a.Status == NKikimrProto::OK) { UNIT_ASSERT_VALUES_EQUAL(q.Shift, a.Shift); UNIT_ASSERT_VALUES_EQUAL(q.Size, a.RequestedSize); - blobSet.Check(queryIdx % blobCount, q.Id, q.Shift, q.Size, a.Buffer); + blobSet.Check(queryIdx % blobCount, q.Id, q.Shift, q.Size, a.Buffer.ConvertToString()); } else { TStringStream str; str << " isRestore# " << isRestore @@ -909,7 +909,7 @@ Y_UNIT_TEST(TestBlock42WipedOneDiskAndErrorDurringGet) { if (a.Status == NKikimrProto::OK) { UNIT_ASSERT_VALUES_EQUAL(q.Shift, a.Shift); UNIT_ASSERT_VALUES_EQUAL(q.Size, a.RequestedSize); - blobSet.Check(queryIdx % blobCount, q.Id, q.Shift, q.Size, a.Buffer); + blobSet.Check(queryIdx % blobCount, q.Id, q.Shift, q.Size, a.Buffer.ConvertToString()); } else { TStringStream str; str << " isRestore# " << isRestore @@ -1007,7 +1007,7 @@ void TestIntervalsWipedError(TErasureType::EErasureSpecies erasureSpecies, bool if (a.Status == NKikimrProto::OK) { UNIT_ASSERT_VALUES_EQUAL(q.Shift, a.Shift); UNIT_ASSERT_VALUES_EQUAL(q.Size, a.RequestedSize); - simulator.BlobSet.Check(queryIdx % blobCount, q.Id, q.Shift, q.Size, a.Buffer); + simulator.BlobSet.Check(queryIdx % blobCount, q.Id, q.Shift, q.Size, a.Buffer.ConvertToString()); } else { TStringStream str; str << " isRestore# " << isRestore @@ -1183,7 +1183,7 @@ void TestWipedErrorWithTwoBlobs(TErasureType::EErasureSpecies erasureSpecies, bo if (a.Status == NKikimrProto::OK) { UNIT_ASSERT_VALUES_EQUAL(q.Shift, a.Shift); UNIT_ASSERT_VALUES_EQUAL(q.Size, a.RequestedSize); - blobSet.Check(queryIdx % blobCount, q.Id, q.Shift, q.Size, a.Buffer); + blobSet.Check(queryIdx % blobCount, q.Id, q.Shift, q.Size, a.Buffer.ConvertToString()); } else { TStringStream str; str << " isRestore# " << isRestore @@ -1280,7 +1280,7 @@ void SpecificTest(ui32 badA, ui32 badB, ui32 blobSize, TMap<i64, i64> sizeForOff if (a.Status == NKikimrProto::OK) { UNIT_ASSERT_VALUES_EQUAL(qb.Shift, a.Shift); UNIT_ASSERT_VALUES_EQUAL(qb.Size, a.RequestedSize); - simulator.BlobSet.Check(0, qb.Id, qb.Shift, qb.Size, a.Buffer); + simulator.BlobSet.Check(0, qb.Id, qb.Shift, qb.Size, a.Buffer.ConvertToString()); } else { TStringStream str; str << " isRestore# false setIdx# 0 status# " << a.Status; @@ -1492,7 +1492,7 @@ public: UNIT_ASSERT_VALUES_EQUAL_C(a.Status, NKikimrProto::OK, currentTestState.Str()); UNIT_ASSERT_VALUES_EQUAL_C(q.Shift, a.Shift, currentTestState.Str()); UNIT_ASSERT_VALUES_EQUAL_C(q.Size, a.RequestedSize, currentTestState.Str()); - BlobSet.Check(queryIdx, q.Id, q.Shift, q.Size, a.Buffer); + BlobSet.Check(queryIdx, q.Id, q.Shift, q.Size, a.Buffer.ConvertToString()); } } RequestsOrder.resize(InitialRequestsSize); @@ -1711,7 +1711,7 @@ public: UNIT_ASSERT_VALUES_EQUAL(a.Status, NKikimrProto::OK); UNIT_ASSERT_VALUES_EQUAL(q.Shift, a.Shift); UNIT_ASSERT_VALUES_EQUAL(q.Size, a.RequestedSize); - BlobSet.Check(queryIdx, q.Id, q.Shift, q.Size, a.Buffer); + BlobSet.Check(queryIdx, q.Id, q.Shift, q.Size, a.Buffer.ConvertToString()); } } }; diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp index 151a316c2ce..6639d79f134 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp @@ -303,7 +303,7 @@ void ConductGet(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCa getResult->Responses[0].Id = args.OriginalId; getResult->Responses[0].Status = NKikimrProto::ERROR; } else { - getResult->Responses[0].Buffer = args.Buffer; + getResult->Responses[0].Buffer = TRope(args.Buffer); getResult->Responses[0].Id = args.OriginalId; getResult->Responses[0].Status = NKikimrProto::OK; } diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp index 42aeb6db2ba..f6c35e627c0 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp @@ -1227,7 +1227,7 @@ Y_UNIT_TEST(TestGivenBlock42PutWhenPartialGetThenSingleDiskRequestOk) { UNIT_ASSERT_C(getResult->Responses[0].Status == NKikimrProto::OK, "Status# " << NKikimrProto::EReplyStatus_Name(getResult->Responses[0].Status)); TString expectedData = data.substr(shift, size); - TString actualData = getResult->Responses[0].Buffer; + TString actualData = getResult->Responses[0].Buffer.ConvertToString(); UNIT_ASSERT_STRINGS_EQUAL_C(expectedData, actualData, "ExpectedSize# " << expectedData.size() << " resultSize$ " << actualData.size() << " part# " << part << " disk# " << disk << " expectedFirst# " << (ui32) (ui8) expectedData[0] << " actualFirst# " << diff --git a/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp b/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp index a7ae711ba71..04fea4b01c4 100644 --- a/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp @@ -454,8 +454,8 @@ protected: VERBOSE_COUT(" response[" << i <<"]: " << StatusToString(response.Status)); if (response.Status == NKikimrProto::OK) { //TODO: Process response.Id (should be same logoblobid as in request) - LastResponse.Data[i] = response.Buffer; - VERBOSE_COUT(" shift: " << response.Shift << " data: " << response.Buffer.c_str()); + LastResponse.Data[i] = response.Buffer.ConvertToString(); + VERBOSE_COUT(" shift: " << response.Shift << " data: " << response.Buffer.ConvertToString()); } } } diff --git a/ydb/core/blobstorage/dsproxy/ut_ftol/dsproxy_fault_tolerance_ut_base.h b/ydb/core/blobstorage/dsproxy/ut_ftol/dsproxy_fault_tolerance_ut_base.h index ef7ae14e611..72e227eea78 100644 --- a/ydb/core/blobstorage/dsproxy/ut_ftol/dsproxy_fault_tolerance_ut_base.h +++ b/ydb/core/blobstorage/dsproxy/ut_ftol/dsproxy_fault_tolerance_ut_base.h @@ -166,7 +166,7 @@ public: UNIT_ASSERT_VALUES_EQUAL(msg->ResponseSz, 1); const TEvBlobStorage::TEvGetResult::TResponse& item = msg->Responses[0]; if (data) { - *data = item.Buffer; + *data = item.Buffer.ConvertToString(); } return item.Status; } diff --git a/ydb/core/blobstorage/groupinfo/ut/ya.make b/ydb/core/blobstorage/groupinfo/ut/ya.make index 2be8b98663d..b3787a39aed 100644 --- a/ydb/core/blobstorage/groupinfo/ut/ya.make +++ b/ydb/core/blobstorage/groupinfo/ut/ya.make @@ -25,7 +25,7 @@ SRCS( blobstorage_groupinfo_ut.cpp ) -IF (BUILD_TYPE == "RELEASE") +IF (BUILD_TYPE != "DEBUG") SRCS( blobstorage_groupinfo_blobmap_ut.cpp blobstorage_groupinfo_partlayout_ut.cpp diff --git a/ydb/core/blobstorage/other/mon_get_blob_page.cpp b/ydb/core/blobstorage/other/mon_get_blob_page.cpp index 95308879a33..8814ebd17f1 100644 --- a/ydb/core/blobstorage/other/mon_get_blob_page.cpp +++ b/ydb/core/blobstorage/other/mon_get_blob_page.cpp @@ -59,7 +59,7 @@ namespace { } else { result.LogoBlobId = msg->Responses[0].Id; result.Status = msg->Responses[0].Status; - result.Buffer = msg->Responses[0].Buffer; + result.Buffer = msg->Responses[0].Buffer.ConvertToString(); result.PartMap = std::move(msg->Responses[0].PartMap); } result.DebugInfo = std::move(msg->DebugInfo); diff --git a/ydb/core/blobstorage/pdisk/ut/ya.make b/ydb/core/blobstorage/pdisk/ut/ya.make index f6d01054c4d..ff8f3e0ce4b 100644 --- a/ydb/core/blobstorage/pdisk/ut/ya.make +++ b/ydb/core/blobstorage/pdisk/ut/ya.make @@ -39,7 +39,7 @@ SRCS( mock/pdisk_mock.cpp ) -IF (BUILD_TYPE == "RELEASE") +IF (BUILD_TYPE != "DEBUG") SRCS( blobstorage_pdisk_ut_yard.cpp ) diff --git a/ydb/core/blobstorage/ut_blobstorage/blob_depot_event_managers.cpp b/ydb/core/blobstorage/ut_blobstorage/blob_depot_event_managers.cpp index ec262c1fb80..8751595d1f9 100644 --- a/ydb/core/blobstorage/ut_blobstorage/blob_depot_event_managers.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/blob_depot_event_managers.cpp @@ -169,7 +169,7 @@ void VerifyTEvGetResult(TAutoPtr<TEventHandle<TEvBlobStorage::TEvGetResult>> res UNIT_ASSERT_VALUES_UNEQUAL(responses[0].Status, NKikimrProto::NODATA); } if (responses[0].Status == NKikimrProto::OK && !isIndexOnly) { - UNIT_ASSERT_VALUES_EQUAL(responses[0].Buffer, blob.Data); + UNIT_ASSERT_VALUES_EQUAL(responses[0].Buffer.ConvertToString(), blob.Data); } } else if (blob.Status == TBlobInfo::EStatus::UNKNOWN) { if (mustRestoreFirst && responses[0].Status == NKikimrProto::OK) { @@ -255,7 +255,7 @@ void VerifyTEvGetResult(TAutoPtr<TEventHandle<TEvBlobStorage::TEvGetResult>> res } else if (blobs[i].Status == TBlobInfo::EStatus::WRITTEN) { UNIT_ASSERT_VALUES_UNEQUAL(responses[i].Status, NKikimrProto::NODATA); if (responses[i].Status == NKikimrProto::OK && !isIndexOnly) { - UNIT_ASSERT_VALUES_EQUAL(responses[i].Buffer, blobs[i].Data); + UNIT_ASSERT_VALUES_EQUAL(responses[i].Buffer.ConvertToString(), blobs[i].Data); } } else if (blobs[i].Status == TBlobInfo::EStatus::UNKNOWN) { if (mustRestoreFirst && responses[i].Status == NKikimrProto::OK) { diff --git a/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp b/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp index f3397ec388b..3b26ca9e223 100644 --- a/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp @@ -39,7 +39,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) { UNIT_ASSERT(getResult); UNIT_ASSERT_VALUES_EQUAL(getResult->ResponseSz, 1); UNIT_ASSERT_VALUES_EQUAL(getResult->Responses[0].Status, status); - UNIT_ASSERT_VALUES_EQUAL(getResult->Responses[0].Buffer, data); + UNIT_ASSERT_VALUES_EQUAL(getResult->Responses[0].Buffer.ConvertToString(), data); }; void SendCollect(const TTestInfo &test, const TLogoBlobID &blobId, diff --git a/ydb/core/blobstorage/ut_blobstorage/discover.cpp b/ydb/core/blobstorage/ut_blobstorage/discover.cpp index 976ad9f4904..ab65e20e055 100644 --- a/ydb/core/blobstorage/ut_blobstorage/discover.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/discover.cpp @@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(Discover) { UNIT_ASSERT_VALUES_EQUAL(getResult->ResponseSz, 1); UNIT_ASSERT_VALUES_EQUAL(getResult->Responses[0].Status, status); if (status == NKikimrProto::EReplyStatus::OK) { - UNIT_ASSERT_VALUES_EQUAL(getResult->Responses[0].Buffer, data); + UNIT_ASSERT_VALUES_EQUAL(getResult->Responses[0].Buffer.ConvertToString(), data); } } diff --git a/ydb/core/blobstorage/ut_blobstorage/get.cpp b/ydb/core/blobstorage/ut_blobstorage/get.cpp index 3d3b101dc0b..c0ebf5da0a7 100644 --- a/ydb/core/blobstorage/ut_blobstorage/get.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/get.cpp @@ -29,7 +29,7 @@ Y_UNIT_TEST_SUITE(Get) { UNIT_ASSERT_VALUES_EQUAL(getResult->ResponseSz, 1); UNIT_ASSERT_VALUES_EQUAL(getResult->Responses[0].Status, status); if (checkResultData && status == NKikimrProto::EReplyStatus::OK) { - UNIT_ASSERT_VALUES_EQUAL(getResult->Responses[0].Buffer, data); + UNIT_ASSERT_VALUES_EQUAL(getResult->Responses[0].Buffer.ConvertToString(), data); } } diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/activity.h b/ydb/core/blobstorage/ut_blobstorage/lib/activity.h index 8c6f3bfd486..9e4c0cb7017 100644 --- a/ydb/core/blobstorage/ut_blobstorage/lib/activity.h +++ b/ydb/core/blobstorage/ut_blobstorage/lib/activity.h @@ -122,7 +122,7 @@ public: UNIT_ASSERT_VALUES_EQUAL(response.Status, NKikimrProto::OK); UNIT_ASSERT_EQUAL(response.Status, NKikimrProto::OK); if (Committed.count(response.Id)) { - UNIT_ASSERT_VALUES_EQUAL(Committed.at(response.Id), response.Buffer); + UNIT_ASSERT_VALUES_EQUAL(Committed.at(response.Id), response.Buffer.ConvertToString()); Committed.erase(response.Id); } if (!--ReadsInFlight) { diff --git a/ydb/core/blobstorage/ut_blobstorage/patch.cpp b/ydb/core/blobstorage/ut_blobstorage/patch.cpp index b68c05b80ef..b35e3930dfb 100644 --- a/ydb/core/blobstorage/ut_blobstorage/patch.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/patch.cpp @@ -34,7 +34,7 @@ Y_UNIT_TEST_SUITE(BlobPatching) { UNIT_ASSERT(getResult); UNIT_ASSERT_VALUES_EQUAL(getResult->ResponseSz, 1); UNIT_ASSERT_VALUES_EQUAL(getResult->Responses[0].Status, status); - UNIT_ASSERT_VALUES_EQUAL(getResult->Responses[0].Buffer, data); + UNIT_ASSERT_VALUES_EQUAL(getResult->Responses[0].Buffer.ConvertToString(), data); }; void SendPatch(const TTestInfo &test, const TLogoBlobID &originalBlobId, const TLogoBlobID &patchedBlobId, ui32 mask, diff --git a/ydb/core/blobstorage/ut_blobstorage/scrub_fast.cpp b/ydb/core/blobstorage/ut_blobstorage/scrub_fast.cpp index b0e49b75fdc..f88f6245bfe 100644 --- a/ydb/core/blobstorage/ut_blobstorage/scrub_fast.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/scrub_fast.cpp @@ -39,7 +39,7 @@ void Test() { auto& r = res->Get()->Responses[0]; UNIT_ASSERT_VALUES_EQUAL(r.Status, status); if (status == NKikimrProto::OK) { - UNIT_ASSERT_VALUES_EQUAL(r.Buffer, data); + UNIT_ASSERT_VALUES_EQUAL(r.Buffer.ConvertToString(), data); } }; diff --git a/ydb/core/blobstorage/ut_blobstorage/ya.make b/ydb/core/blobstorage/ut_blobstorage/ya.make index d0d30e53743..6fd28749c28 100644 --- a/ydb/core/blobstorage/ut_blobstorage/ya.make +++ b/ydb/core/blobstorage/ut_blobstorage/ya.make @@ -31,7 +31,7 @@ SRCS( sync.cpp ) -IF (BUILD_TYPE == "RELEASE") +IF (BUILD_TYPE != "DEBUG") SRCS( big_cluster.cpp get.cpp diff --git a/ydb/core/blobstorage/ut_group/main.cpp b/ydb/core/blobstorage/ut_group/main.cpp index 07a0801130a..3c6b4ba17e5 100644 --- a/ydb/core/blobstorage/ut_group/main.cpp +++ b/ydb/core/blobstorage/ut_group/main.cpp @@ -532,7 +532,7 @@ public: Y_VERIFY(response.Status == NKikimrProto::OK); const auto it = Committed.find(response.Id); Y_VERIFY(it != Committed.end()); - Y_VERIFY(it->second == response.Buffer); + Y_VERIFY(it->second == response.Buffer.ConvertToString()); Committed.erase(it); } --readsInFlight; diff --git a/ydb/core/blobstorage/ut_pdiskfit/ut/ya.make b/ydb/core/blobstorage/ut_pdiskfit/ut/ya.make index 6f64babbe77..b4e9ff9707b 100644 --- a/ydb/core/blobstorage/ut_pdiskfit/ut/ya.make +++ b/ydb/core/blobstorage/ut_pdiskfit/ut/ya.make @@ -7,7 +7,7 @@ IF (OS_LINUX AND NOT WITH_VALGRIND) TAG(ya:fat) - IF (BUILD_TYPE == "RELEASE") + IF (BUILD_TYPE != "DEBUG") SRCS( main.cpp ) diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp index 6a03ebf0c5f..d3d1506342a 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp @@ -136,7 +136,7 @@ namespace NKikimr { return; } - Buffer = result->Responses[0].Buffer; + Buffer = result->Responses[0].Buffer.ConvertToString(); ApplyDiffs(); TInstant deadline = TActivationContext::Now() + TDuration::MilliSeconds(SubRequestDurationMs); diff --git a/ydb/core/client/server/msgbus_server_get.cpp b/ydb/core/client/server/msgbus_server_get.cpp index 326de522b12..b912bea015f 100644 --- a/ydb/core/client/server/msgbus_server_get.cpp +++ b/ydb/core/client/server/msgbus_server_get.cpp @@ -52,7 +52,7 @@ public: if (msg->Status == NKikimrProto::OK && msg->ResponseSz == 1) { const TEvBlobStorage::TEvGetResult::TResponse& resp = msg->Responses[0]; Response.SetStatus(resp.Status); - Response.SetBuffer(resp.Buffer); + Response.SetBuffer(resp.Buffer.ConvertToString()); } IssueResponse(ctx); } diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp index 4d91fef7ee4..9b248b62e67 100644 --- a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp +++ b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp @@ -294,7 +294,8 @@ public: "readItem.ValueOffset# " << readItem.ValueOffset << " readItem.BlobSize# " << readItem.BlobSize << " read.ValueSize# " << read.ValueSize); - memcpy(const_cast<char *>(read.Value.data()) + readItem.ValueOffset, response.Buffer.data(), response.Buffer.size()); + Y_VERIFY(read.Value.IsDetached()); + response.Buffer.begin().ExtractPlainDataAndAdvance(read.Value.Detach() + readItem.ValueOffset, response.Buffer.size()); IntermediateResult->Stat.GroupReadBytes[std::make_pair(response.Id.Channel(), batch.GroupId)] += response.Buffer.size(); // FIXME: count distinct blobs?" keyvalue_storage_request.cpp:279 IntermediateResult->Stat.GroupReadIops[std::make_pair(response.Id.Channel(), batch.GroupId)] += 1; diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp b/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp index 2ce080f54b9..224c0b86cac 100644 --- a/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp @@ -46,11 +46,16 @@ struct TBlobStorageMockState { response.Id = query.Id; response.Shift = query.Shift; response.RequestedSize = query.Size; - std::tie(response.Status, response.Buffer) = getBlob(query.Id); + + TString r; + std::tie(response.Status, r) = getBlob(query.Id); + if (response.Status == NKikimrProto::OK) { - TString buffer = TString::Uninitialized(query.Size); - memcpy(const_cast<char *>(buffer.data()), response.Buffer.data() + query.Shift, response.Buffer.size()); - response.Buffer = buffer; + const size_t shift = Min<size_t>(query.Shift, r.size()); + const size_t size = Min<size_t>(query.Size ? query.Size : Max<size_t>(), r.size() - shift); + TString buffer = TString::Uninitialized(size); + memcpy(buffer.Detach(), r.data() + shift, size); + response.Buffer = TRope(std::move(buffer)); } } return getResult; diff --git a/ydb/core/keyvalue/keyvalue_storage_request.cpp b/ydb/core/keyvalue/keyvalue_storage_request.cpp index 1522e0b22f0..4f24557e6b8 100644 --- a/ydb/core/keyvalue/keyvalue_storage_request.cpp +++ b/ydb/core/keyvalue/keyvalue_storage_request.cpp @@ -280,7 +280,7 @@ public: auto groupId = ev->Get()->GroupId; decltype(request.ReadQueue)::iterator it = request.ReadQueue.begin(); for (ui32 i = 0, num = ev->Get()->ResponseSz; i < num; ++i, ++it) { - const auto& response = ev->Get()->Responses[i]; + auto& response = ev->Get()->Responses[i]; auto& read = *it->Read; auto& readItem = *it->ReadItem; @@ -290,7 +290,8 @@ public: } Y_VERIFY(response.Buffer.size() == readItem.BlobSize); Y_VERIFY(readItem.ValueOffset + readItem.BlobSize <= read.ValueSize); - memcpy(const_cast<char *>(read.Value.data()) + readItem.ValueOffset, response.Buffer.data(), response.Buffer.size()); + Y_VERIFY(read.Value.IsDetached()); + response.Buffer.begin().ExtractPlainDataAndAdvance(read.Value.Detach() + readItem.ValueOffset, response.Buffer.size()); IntermediateResults->Stat.GroupReadBytes[std::make_pair(response.Id.Channel(), groupId)] += response.Buffer.size(); IntermediateResults->Stat.GroupReadIops[std::make_pair(response.Id.Channel(), groupId)] += 1; // FIXME: count distinct blobs? } else { diff --git a/ydb/core/mind/ut_fat/blobstorage_node_warden_ut_fat.cpp b/ydb/core/mind/ut_fat/blobstorage_node_warden_ut_fat.cpp index d78e767c970..98ba8a7b5f5 100644 --- a/ydb/core/mind/ut_fat/blobstorage_node_warden_ut_fat.cpp +++ b/ydb/core/mind/ut_fat/blobstorage_node_warden_ut_fat.cpp @@ -349,7 +349,7 @@ Y_UNIT_TEST_SUITE(TBlobStorageWardenTest) { UNIT_ASSERT(getResult->ResponseSz == 1); UNIT_ASSERT(getResult->Responses.Get()); UNIT_ASSERT_EQUAL((getResult->Responses)[0].Buffer.size(), data.size()); - UNIT_ASSERT_EQUAL((getResult->Responses)[0].Buffer, data); + UNIT_ASSERT_EQUAL((getResult->Responses)[0].Buffer.ConvertToString(), data); } void VGet(TTestActorRuntime &runtime, TActorId &sender, ui32 groupId, ui32 nodeId, TLogoBlobID logoBlobId, diff --git a/ydb/core/tablet_flat/flat_bio_actor.cpp b/ydb/core/tablet_flat/flat_bio_actor.cpp index 9bbb4e1a211..d91ae40319b 100644 --- a/ydb/core/tablet_flat/flat_bio_actor.cpp +++ b/ydb/core/tablet_flat/flat_bio_actor.cpp @@ -161,10 +161,7 @@ void TBlockIO::Handle(ui32 base, TArrayRef<TLoaded> items) noexcept auto& state = BlockStates.at(brick.Slot); Y_VERIFY(state.Data.size() - state.Offset >= piece.Buffer.size()); - ::memcpy( - state.Data.mutable_data() + state.Offset, - piece.Buffer.data(), - piece.Buffer.size()); + piece.Buffer.begin().ExtractPlainDataAndAdvance(state.Data.mutable_data() + state.Offset, piece.Buffer.size()); state.Offset += piece.Buffer.size(); } diff --git a/ydb/core/tablet_flat/flat_load_blob_queue.cpp b/ydb/core/tablet_flat/flat_load_blob_queue.cpp index 3c97b4a5359..ef6369f010d 100644 --- a/ydb/core/tablet_flat/flat_load_blob_queue.cpp +++ b/ydb/core/tablet_flat/flat_load_blob_queue.cpp @@ -171,11 +171,13 @@ bool TLoadBlobQueue::ProcessResult(TEvBlobStorage::TEvGetResult* msg) { ActiveBytesInFly -= x->Id.BlobSize(); + TString buffer = x->Buffer.ConvertToString(); + if (std::next(p.first) == p.second) { // Common case: unique load of the blob auto item = p.first->second; Active.erase(p.first); - item.Load->OnBlobLoaded(x->Id, std::move(x->Buffer), item.Cookie); + item.Load->OnBlobLoaded(x->Id, std::move(buffer), item.Cookie); } else { // May rarely happen: concurrent load of the same blob TVector<TActiveItem> items; @@ -188,10 +190,10 @@ bool TLoadBlobQueue::ProcessResult(TEvBlobStorage::TEvGetResult* msg) { size_t last = items.size() - 1; for (size_t i = 0; i < last; ++i) { // We have to make a copy of the buffer - items[i].Load->OnBlobLoaded(x->Id, x->Buffer, items[i].Cookie); + items[i].Load->OnBlobLoaded(x->Id, buffer, items[i].Cookie); } // Last load may consume our buffer - items[last].Load->OnBlobLoaded(x->Id, std::move(x->Buffer), items[last].Cookie); + items[last].Load->OnBlobLoaded(x->Id, std::move(buffer), items[last].Cookie); } } diff --git a/ydb/core/tx/columnshard/blob_cache.cpp b/ydb/core/tx/columnshard/blob_cache.cpp index 4d1c8ca3325..a0edefcbf94 100644 --- a/ydb/core/tx/columnshard/blob_cache.cpp +++ b/ydb/core/tx/columnshard/blob_cache.cpp @@ -520,7 +520,7 @@ private: if (res.Status == NKikimrProto::EReplyStatus::NODATA) { fallbackRanges[blobRanges[i].BlobId].emplace_back(std::move(blobRanges[i])); } else { - ProcessSingleRangeResult(blobRanges[i], readCookie, res.Status, res.Buffer, ctx); + ProcessSingleRangeResult(blobRanges[i], readCookie, res.Status, res.Buffer.ConvertToString(), ctx); } } |