diff options
author | Alexander Rutkovsky <alexvru@ydb.tech> | 2025-03-05 14:50:19 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-05 11:50:19 +0000 |
commit | 18c06581a73311b11d3008b1bccd606147e19da0 (patch) | |
tree | 441eb544a8086acf74cd79680c8f00082ae3d587 | |
parent | 9908e6c91516d60ca5b96ef2f14722ca93366763 (diff) | |
download | ydb-18c06581a73311b11d3008b1bccd606147e19da0.tar.gz |
Add S3 support into BlobDepot (#14979)
39 files changed, 1884 insertions, 269 deletions
diff --git a/ydb/apps/dstool/lib/dstool_cmd_group_virtual_create.py b/ydb/apps/dstool/lib/dstool_cmd_group_virtual_create.py index 061e37b58c1..54bc3aca8ac 100644 --- a/ydb/apps/dstool/lib/dstool_cmd_group_virtual_create.py +++ b/ydb/apps/dstool/lib/dstool_cmd_group_virtual_create.py @@ -2,6 +2,8 @@ import ydb.apps.dstool.lib.common as common import ydb.core.protos.blob_depot_config_pb2 as blob_depot_config import sys import time +from argparse import FileType +import google.protobuf.json_format as pb_json description = 'Create virtual group backed by BlobDepot' @@ -17,11 +19,18 @@ def add_options(p): p.add_argument('--log-channel-sp', type=str, metavar='POOL_NAME', required=True, help='channel 0 specifier') p.add_argument('--snapshot-channel-sp', type=str, metavar='POOL_NAME', help='channel 1 specifier (defaults to channel 0)') p.add_argument('--data-channel-sp', type=str, metavar='POOL_NAME[*COUNT]', nargs='+', required=True, help='data channel specifier') + p.add_argument('--s3-settings', type=FileType('r', encoding='utf-8'), metavar='JSON_FILE', help='path to JSON file containing S3 settings') p.add_argument('--wait', action='store_true', help='wait for operation to complete by polling') def do(args): request = common.create_bsc_request(args) + + if args.s3_settings: + s3_settings = args.s3_settings.read() + else: + s3_settings = None + for name in args.name: cmd = request.Command.add().AllocateVirtualGroup @@ -41,6 +50,9 @@ def do(args): print(f'Invalid --storage-pool-id={args.storage_pool_id} format, <number>:<number> expected', file=sys.stderr) sys.exit(1) + if s3_settings is not None: + pb_json.Parse(s3_settings, cmd.S3BackendSettings) + cmd.ChannelProfiles.add(StoragePoolName=args.log_channel_sp, ChannelKind=blob_depot_config.TChannelKind.System) chan1 = args.snapshot_channel_sp if args.snapshot_channel_sp is not None else args.log_channel_sp cmd.ChannelProfiles.add(StoragePoolName=chan1, ChannelKind=blob_depot_config.TChannelKind.System) diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp index 552e53213da..b925d66fcdf 100644 --- a/ydb/core/blob_depot/agent.cpp +++ b/ydb/core/blob_depot/agent.cpp @@ -1,6 +1,7 @@ #include "blob_depot_tablet.h" #include "data.h" #include "space_monitor.h" +#include "s3.h" namespace NKikimr::NBlobDepot { @@ -30,6 +31,12 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::OnAgentDisconnect(TAgent& agent) { agent.InvalidateStepRequests.clear(); agent.PushCallbacks.clear(); + + for (TS3Locator locator : agent.S3WritesInFlight) { + // they were not in InFlightTrashS3, so we just have to delete them + S3Manager->AddTrashToCollect(locator); + } + agent.S3WritesInFlight.clear(); } void TBlobDepot::Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev) { @@ -79,6 +86,14 @@ namespace NKikimr::NBlobDepot { record->SetDecommitGroupId(Config.GetVirtualGroupId()); } + if (Config.HasS3BackendSettings()) { + record->MutableS3BackendSettings()->CopyFrom(Config.GetS3BackendSettings()); + } + + if (Config.HasName()) { + record->SetName(Config.GetName()); + } + TActivationContext::Send(response.release()); if (!agent.InvalidatedStepInFlight.empty()) { diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index e4edfdaa237..6adf340e63a 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -5,6 +5,8 @@ #include <ydb/core/protos/blob_depot_config.pb.h> +#include <ydb/core/wrappers/abstract.h> + namespace NKikimr::NBlobDepot { #define ENUMERATE_INCOMING_EVENTS(XX) \ @@ -129,6 +131,7 @@ namespace NKikimr::NBlobDepot { TEvBlobDepot::TEvCollectGarbageResult*, TEvBlobDepot::TEvCommitBlobSeqResult*, TEvBlobDepot::TEvResolveResult*, + TEvBlobDepot::TEvPrepareWriteS3Result*, // underlying DS proxy responses TEvBlobStorage::TEvGetResult*, @@ -233,6 +236,7 @@ namespace NKikimr::NBlobDepot { hFunc(TEvBlobDepot::TEvCollectGarbageResult, HandleTabletResponse); hFunc(TEvBlobDepot::TEvCommitBlobSeqResult, HandleTabletResponse); hFunc(TEvBlobDepot::TEvResolveResult, HandleTabletResponse); + hFunc(TEvBlobDepot::TEvPrepareWriteS3Result, HandleTabletResponse); hFunc(TEvBlobStorage::TEvGetResult, HandleOtherResponse); hFunc(TEvBlobStorage::TEvPutResult, HandleOtherResponse); @@ -255,6 +259,9 @@ namespace NKikimr::NBlobDepot { void PassAway() override { ClearPendingEventQueue("BlobDepot agent destroyed"); NTabletPipe::CloseAndForgetClient(SelfId(), PipeId); + if (S3WrapperId) { + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, S3WrapperId, SelfId(), nullptr, 0)); + } TActor::PassAway(); } @@ -318,6 +325,11 @@ namespace NKikimr::NBlobDepot { NKikimrBlobStorage::TPDiskSpaceColor::E SpaceColor = {}; float ApproximateFreeSpaceShare = 0.0f; + NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig; + std::optional<NKikimrBlobDepot::TS3BackendSettings> S3BackendSettings; + TActorId S3WrapperId; + TString S3BasePath; + void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev); void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev); void ConnectToBlobDepot(); @@ -447,6 +459,8 @@ namespace NKikimr::NBlobDepot { void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev); void HandleQueryWatchdog(); + void Invoke(std::function<void()> callback) { callback(); } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// struct TChannelKind diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index 1b92ee886ad..82abae69791 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -1,6 +1,8 @@ #include "agent_impl.h" #include "blocks.h" +#include <ydb/core/wrappers/s3_wrapper.h> + namespace NKikimr::NBlobDepot { void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) { @@ -86,6 +88,24 @@ namespace NKikimr::NBlobDepot { SpaceColor = msg.GetSpaceColor(); ApproximateFreeSpaceShare = msg.GetApproximateFreeSpaceShare(); + S3BackendSettings = msg.HasS3BackendSettings() + ? std::make_optional(msg.GetS3BackendSettings()) + : std::nullopt; + + if (S3WrapperId) { + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, S3WrapperId, SelfId(), nullptr, 0)); + S3WrapperId = {}; + } + + if (S3BackendSettings) { + auto& settings = S3BackendSettings->GetSettings(); + ExternalStorageConfig = NWrappers::IExternalStorageConfig::Construct(settings); + S3WrapperId = Register(NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator())); + S3BasePath = TStringBuilder() << settings.GetObjectKeyPattern() << '/' << msg.GetName(); + } else { + ExternalStorageConfig = {}; + } + OnConnect(); } @@ -171,6 +191,7 @@ namespace NKikimr::NBlobDepot { template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestContext::TPtr context); template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvCommitBlobSeq msg, TRequestSender *sender, TRequestContext::TPtr context); template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvDiscardSpoiledBlobSeq msg, TRequestSender *sender, TRequestContext::TPtr context); + template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvPrepareWriteS3 msg, TRequestSender *sender, TRequestContext::TPtr context); ui64 TBlobDepotAgent::Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestContext::TPtr context) { const ui64 id = NextTabletRequestId++; diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp index 9b1e5519955..653ef5f5252 100644 --- a/ydb/core/blob_depot/agent/read.cpp +++ b/ydb/core/blob_depot/agent/read.cpp @@ -72,25 +72,22 @@ namespace NKikimr::NBlobDepot { ui32 Size; ui64 OutputOffset; }; + struct TS3ReadItem { + TString Key; + ui32 Offset; + ui32 Size; + ui64 OutputOffset; + }; std::vector<TReadItem> items; + std::vector<TS3ReadItem> s3items; ui64 offset = arg.Offset; ui64 size = arg.Size; for (const auto& value : arg.Value.Chain) { - const ui32 groupId = value.GroupId; - const auto& blobId = value.BlobId; const ui32 begin = value.SubrangeBegin; const ui32 end = value.SubrangeEnd; - if (end <= begin || blobId.BlobSize() < end) { - error = "incorrect SubrangeBegin/SubrangeEnd pair"; - STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA24, error, (AgentId, Agent.LogId), (QueryId, GetQueryId()), - (ReadId, arg.Tag), (Key, Agent.PrettyKey(arg.Key)), (Offset, arg.Offset), (Size, arg.Size), - (Value, arg.Value)); - return false; - } - // calculate the whole length of current part ui64 partLen = end - begin; if (offset >= partLen) { @@ -103,7 +100,26 @@ namespace NKikimr::NBlobDepot { partLen = Min(size ? size : Max<ui64>(), partLen - offset); Y_ABORT_UNLESS(partLen); - items.push_back(TReadItem{groupId, blobId, ui32(offset + begin), ui32(partLen), outputOffset}); + ui32 itemLen = 0; + ui32 partOffset = offset + begin; + + if (value.Blob) { + const auto& [blobId, groupId] = *value.Blob; + items.push_back(TReadItem{groupId, blobId, partOffset, ui32(partLen), outputOffset}); + itemLen = blobId.BlobSize(); + } else if (const auto& locator = value.S3Locator) { + TString key = locator->MakeObjectName(Agent.S3BasePath); + s3items.push_back(TS3ReadItem{std::move(key), partOffset, ui32(partLen), outputOffset}); + itemLen = locator->Len; + } + + if (end <= begin || itemLen < end) { + error = "incorrect SubrangeBegin/SubrangeEnd pair"; + STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA24, error, (AgentId, Agent.LogId), (QueryId, GetQueryId()), + (ReadId, arg.Tag), (Key, Agent.PrettyKey(arg.Key)), (Offset, arg.Offset), (Size, arg.Size), + (Value, arg.Value)); + return false; + } outputOffset += partLen; offset = 0; @@ -156,6 +172,85 @@ namespace NKikimr::NBlobDepot { ++context->NumPartsPending; } + for (TS3ReadItem& item : s3items) { + class TGetActor : public TActor<TGetActor> { + size_t OutputOffset; + std::shared_ptr<TReadContext> ReadContext; + TQuery* const Query; + + TString AgentLogId; + TString QueryId; + ui64 ReadId; + + public: + TGetActor(size_t outputOffset, std::shared_ptr<TReadContext> readContext, TQuery *query) + : TActor(&TThis::StateFunc) + , OutputOffset(outputOffset) + , ReadContext(std::move(readContext)) + , Query(query) + , AgentLogId(query->Agent.LogId) + , QueryId(query->GetQueryId()) + , ReadId(ReadContext->GetTag()) + {} + + void Handle(NWrappers::TEvExternalStorage::TEvGetObjectResponse::TPtr ev) { + auto& msg = *ev->Get(); + + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA55, "received TEvGetObjectResponse", + (AgentId, AgentLogId), (QueryId, QueryId), (ReadId, ReadId), + (Response, msg.Result), (BodyLen, std::size(msg.Body))); + + if (msg.IsSuccess()) { + Finish(std::move(msg.Body), ""); + } else if (const auto& error = msg.GetError(); error.GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) { + Finish(std::nullopt, "data has disappeared from S3"); + } else { + Finish(std::nullopt, msg.GetError().GetMessage().c_str()); + } + } + + void HandleUndelivered() { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA56, "received TEvUndelivered", + (AgentId, AgentLogId), (QueryId, QueryId), (ReadId, ReadId)); + Finish(std::nullopt, "wrapper actor terminated"); + } + + void Finish(std::optional<TString> data, const char *error) { + auto& context = *ReadContext; + if (!context.Terminated && !context.StopProcessingParts) { + if (data) { + context.Buffer.Write(OutputOffset, TRope(std::move(*data))); + if (!--context.NumPartsPending) { + context.EndWithSuccess(Query); + } + } else { + context.EndWithError(Query, NKikimrProto::ERROR, TStringBuilder() + << "failed to fetch data from S3: " << error); + } + } + PassAway(); + } + + STRICT_STFUNC(StateFunc, + hFunc(NWrappers::TEvExternalStorage::TEvGetObjectResponse, Handle) + cFunc(TEvents::TSystem::Undelivered, HandleUndelivered) + cFunc(TEvents::TSystem::Poison, PassAway) + ) + }; + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA57, "starting S3 read", (AgentId, Agent.LogId), (QueryId, GetQueryId()), + (ReadId, context->GetTag()), (Key, item.Key), (Offset, item.Offset), (Size, item.Size), + (OutputOffset, item.OutputOffset)); + const TActorId actorId = Agent.RegisterWithSameMailbox(new TGetActor(item.OutputOffset, context, this)); + auto request = std::make_unique<NWrappers::TEvExternalStorage::TEvGetObjectRequest>( + Aws::S3::Model::GetObjectRequest() + .WithBucket(Agent.S3BackendSettings->GetSettings().GetBucket()) + .WithKey(std::move(item.Key)) + .WithRange(TStringBuilder() << "bytes=" << item.Offset << '-' << item.Offset + item.Size - 1) + ); + TActivationContext::Send(new IEventHandle(Agent.S3WrapperId, actorId, request.release(), IEventHandle::FlagTrackDelivery)); + ++context->NumPartsPending; + } + Y_ABORT_UNLESS(context->NumPartsPending); return true; diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp index 6527bf6f7e2..8af4498e93d 100644 --- a/ydb/core/blob_depot/agent/request.cpp +++ b/ydb/core/blob_depot/agent/request.cpp @@ -97,6 +97,7 @@ namespace NKikimr::NBlobDepot { template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCollectGarbageResult::TPtr ev); template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCommitBlobSeqResult::TPtr ev); template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvResolveResult::TPtr ev); + template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvPrepareWriteS3Result::TPtr ev); template<typename TEvent> void TBlobDepotAgent::HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev) { diff --git a/ydb/core/blob_depot/agent/resolved_value.cpp b/ydb/core/blob_depot/agent/resolved_value.cpp index 32af8762ad4..b32e6c30c0c 100644 --- a/ydb/core/blob_depot/agent/resolved_value.cpp +++ b/ydb/core/blob_depot/agent/resolved_value.cpp @@ -3,16 +3,36 @@ namespace NKikimr::NBlobDepot { TResolvedValue::TLink::TLink(const NKikimrBlobDepot::TResolvedValueChain& link) - : BlobId(LogoBlobIDFromLogoBlobID(link.GetBlobId())) - , GroupId(link.GetGroupId()) - , SubrangeBegin(link.GetSubrangeBegin()) - , SubrangeEnd(link.HasSubrangeEnd() ? link.GetSubrangeEnd() : BlobId.BlobSize()) + : SubrangeBegin(link.GetSubrangeBegin()) + , SubrangeEnd(link.GetSubrangeEnd()) { - Y_DEBUG_ABORT_UNLESS(link.HasBlobId() && link.HasGroupId()); + std::optional<ui32> length; + if (link.HasBlobId() && link.HasGroupId()) { + const TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(link.GetBlobId()); + Blob.emplace(blobId, link.GetGroupId()); + Y_VERIFY_S(!length || *length == blobId.BlobSize(), SingleLineProto(link)); + length.emplace(blobId.BlobSize()); + } + if (link.HasS3Locator()) { + S3Locator.emplace(TS3Locator::FromProto(link.GetS3Locator())); + Y_VERIFY_S(!length || *length == S3Locator->Len, SingleLineProto(link)); + length.emplace(S3Locator->Len); + } + if (!link.HasSubrangeEnd()) { + Y_VERIFY_S(length, SingleLineProto(link)); + SubrangeEnd = *length; + } } void TResolvedValue::TLink::Output(IOutputStream& s) const { - s << BlobId << '@' << GroupId << '{' << SubrangeBegin << '-' << SubrangeEnd - 1 << '}'; + if (Blob) { + const auto& [blobId, groupId] = *Blob; + s << blobId << '@' << groupId; + } + if (S3Locator) { + s << S3Locator->ToString(); + } + s << '{' << SubrangeBegin << '-' << SubrangeEnd - 1 << '}'; } TString TResolvedValue::TLink::ToString() const { diff --git a/ydb/core/blob_depot/agent/resolved_value.h b/ydb/core/blob_depot/agent/resolved_value.h index a3194e65ec6..7429c3b59c1 100644 --- a/ydb/core/blob_depot/agent/resolved_value.h +++ b/ydb/core/blob_depot/agent/resolved_value.h @@ -6,8 +6,8 @@ namespace NKikimr::NBlobDepot { struct TResolvedValue { struct TLink { - TLogoBlobID BlobId; - ui32 GroupId; + std::optional<std::tuple<TLogoBlobID, ui32>> Blob; + std::optional<TS3Locator> S3Locator; ui32 SubrangeBegin; ui32 SubrangeEnd; @@ -16,10 +16,7 @@ namespace NKikimr::NBlobDepot { void Output(IOutputStream& s) const; TString ToString() const; - friend bool operator ==(const TLink& x, const TLink& y) { - return x.BlobId == y.BlobId && x.GroupId == y.GroupId && x.SubrangeBegin == y.SubrangeBegin && - x.SubrangeEnd == y.SubrangeEnd; - } + friend std::strong_ordering operator <=>(const TLink&, const TLink&) = default; }; bool Defined = false; diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index a1d8aeb6f20..5ead9d58a5e 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -15,19 +15,36 @@ namespace NKikimr::NBlobDepot { bool WrittenBeyondBarrier = false; NKikimrBlobDepot::TEvCommitBlobSeq CommitBlobSeq; TBlobSeqId BlobSeqId; + std::optional<TS3Locator> LocatorInFlight; + TActorId WriterActorId; + + struct TLifetimeToken {}; + std::shared_ptr<TLifetimeToken> LifetimeToken; public: using TBlobStorageQuery::TBlobStorageQuery; void OnDestroy(bool success) override { - if (IsInFlight) { + if (IsInFlight || LocatorInFlight) { Y_ABORT_UNLESS(!success); - RemoveBlobSeqFromInFlight(); + if (IsInFlight) { + RemoveBlobSeqFromInFlight(); + } NKikimrBlobDepot::TEvDiscardSpoiledBlobSeq msg; - BlobSeqId.ToProto(msg.AddItems()); + if (IsInFlight) { + BlobSeqId.ToProto(msg.AddItems()); + } + if (LocatorInFlight) { + LocatorInFlight->ToProto(msg.AddS3Locators()); + } Agent.Issue(std::move(msg), this, nullptr); } + if (WriterActorId) { + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, WriterActorId, Agent.SelfId(), + nullptr, 0)); + } + TBlobStorageQuery::OnDestroy(success); } @@ -67,6 +84,24 @@ namespace NKikimr::NBlobDepot { void IssuePuts() { Y_ABORT_UNLESS(!PutsIssued); + auto prepare = [&] { + Y_ABORT_UNLESS(CommitBlobSeq.ItemsSize() == 0); + auto *commitItem = CommitBlobSeq.AddItems(); + commitItem->SetKey(Request.Id.AsBinaryString()); + for (const auto& [tabletId, generation] : Request.ExtraBlockChecks) { + auto *p = commitItem->AddExtraBlockChecks(); + p->SetTabletId(tabletId); + p->SetGeneration(generation); + } + return commitItem; + }; + + if (const auto& s3 = Agent.S3BackendSettings; s3 && s3->HasSyncMode()) { + auto *commitItem = prepare(); + commitItem->MutableS3Locator(); + return IssueS3Put(); + } + const auto it = Agent.ChannelKinds.find(NKikimrBlobDepot::TChannelKind::Data); if (it == Agent.ChannelKinds.end()) { return EndWithError(NKikimrProto::ERROR, "no Data channels"); @@ -90,9 +125,7 @@ namespace NKikimr::NBlobDepot { BDEV_QUERY(BDEV09, "TEvPut_new", (U.BlobId, Request.Id), (U.BufferSize, Request.Buffer.size()), (U.HandleClass, Request.HandleClass)); - Y_ABORT_UNLESS(CommitBlobSeq.ItemsSize() == 0); - auto *commitItem = CommitBlobSeq.AddItems(); - commitItem->SetKey(Request.Id.AsBinaryString()); + auto *commitItem = prepare(); auto *locator = commitItem->MutableBlobLocator(); BlobSeqId.ToProto(locator->MutableBlobSeqId()); //locator->SetChecksum(Crc32c(Request.Buffer.data(), Request.Buffer.size())); @@ -192,15 +225,23 @@ namespace NKikimr::NBlobDepot { } void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override { - if (auto *p = std::get_if<TEvBlobStorage::TEvPutResult*>(&response)) { - HandlePutResult(std::move(context), **p); - } else if (auto *p = std::get_if<TEvBlobDepot::TEvCommitBlobSeqResult*>(&response)) { - HandleCommitBlobSeqResult(std::move(context), (*p)->Record); - } else if (std::holds_alternative<TTabletDisconnected>(response)) { - EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); - } else { - Y_ABORT("unexpected response"); - } + std::visit(TOverloaded{ + [&](TEvBlobStorage::TEvPutResult *ev) { + HandlePutResult(std::move(context), *ev); + }, + [&](TEvBlobDepot::TEvCommitBlobSeqResult *ev) { + HandleCommitBlobSeqResult(std::move(context), ev->Record); + }, + [&](TTabletDisconnected) { + EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); + }, + [&](TEvBlobDepot::TEvPrepareWriteS3Result *ev) { + HandlePrepareWriteS3Result(std::move(context), ev->Record); + }, + [&](auto /*other*/) { + Y_ABORT("unexpected response"); + } + }, response); } void HandlePutResult(TRequestContext::TPtr /*context*/, TEvBlobStorage::TEvPutResult& msg) { @@ -284,6 +325,107 @@ namespace NKikimr::NBlobDepot { ui64 GetTabletId() const override { return Request.Id.TabletID(); } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////// + // writing directly to S3 + + void IssueS3Put() { + NKikimrBlobDepot::TEvPrepareWriteS3 query; + auto *item = query.AddItems(); + item->SetKey(Request.Id.AsBinaryString()); + for (const auto& [tabletId, generation] : Request.ExtraBlockChecks) { + auto *p = item->AddExtraBlockChecks(); + p->SetTabletId(tabletId); + p->SetGeneration(generation); + } + item->SetLen(Request.Id.BlobSize()); + Agent.Issue(query, this, nullptr); + } + + void HandlePrepareWriteS3Result(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvPrepareWriteS3Result& msg) { + Y_ABORT_UNLESS(msg.ItemsSize() == 1); + const auto& item = msg.GetItems(0); + if (item.GetStatus() != NKikimrProto::OK) { + return EndWithError(item.GetStatus(), item.GetErrorReason()); + } + + auto *commitItem = CommitBlobSeq.MutableItems(0); + auto *locator = commitItem->MutableS3Locator(); + locator->CopyFrom(item.GetS3Locator()); + + LocatorInFlight.emplace(TS3Locator::FromProto(*locator)); + + class TWriteActor : public TActor<TWriteActor> { + std::weak_ptr<TLifetimeToken> LifetimeToken; + TPutQuery* const PutQuery; + + public: + TWriteActor(std::weak_ptr<TLifetimeToken> lifetimeToken, TPutQuery *putQuery) + : TActor(&TThis::StateFunc) + , LifetimeToken(std::move(lifetimeToken)) + , PutQuery(putQuery) + {} + + void Handle(NWrappers::TEvExternalStorage::TEvPutObjectResponse::TPtr ev) { + auto& msg = *ev->Get(); + Finish(msg.IsSuccess() + ? std::nullopt + : std::make_optional<TString>(msg.GetError().GetMessage())); + } + + void HandleUndelivered() { + Finish("event undelivered"); + } + + void Finish(std::optional<TString>&& error) { + if (!LifetimeToken.expired()) { + InvokeOtherActor(PutQuery->Agent, &TBlobDepotAgent::Invoke, [&] { + PutQuery->OnPutS3ObjectResponse(std::move(error)); + }); + } + PassAway(); + } + + STRICT_STFUNC(StateFunc, + hFunc(NWrappers::TEvExternalStorage::TEvPutObjectResponse, Handle) + cFunc(TEvents::TSystem::Undelivered, HandleUndelivered) + cFunc(TEvents::TSystem::Poison, PassAway) + ) + }; + + TString key = TS3Locator::FromProto(*locator).MakeObjectName(Agent.S3BasePath); + + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA54, "starting WriteActor", (AgentId, Agent.LogId), + (QueryId, GetQueryId()), (Key, key)); + + LifetimeToken = std::make_shared<TLifetimeToken>(); + WriterActorId = Agent.RegisterWithSameMailbox(new TWriteActor(LifetimeToken, this)); + + TActivationContext::Send(new IEventHandle(Agent.S3WrapperId, WriterActorId, + new NWrappers::TEvExternalStorage::TEvPutObjectRequest( + Aws::S3::Model::PutObjectRequest() + .WithBucket(std::move(Agent.S3BackendSettings->GetSettings().GetBucket())) + .WithKey(std::move(key)) + .AddMetadata("key", Request.Id.ToString()), + Request.Buffer.ExtractUnderlyingContainerOrCopy<TString>()), + IEventHandle::FlagTrackDelivery)); + } + + void OnPutS3ObjectResponse(std::optional<TString>&& error) { + STLOG(error ? PRI_WARN : PRI_DEBUG, BLOB_DEPOT_AGENT, BDA53, "OnPutS3ObjectResponse", + (AgentId, Agent.LogId), (QueryId, GetQueryId()), (Error, error)); + + WriterActorId = {}; + + if (error) { + // LocatorInFlight is not reset here on purpose: OnDestroy will generate spoiled blob message to the + // tablet + EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to put object to S3: " << *error); + } else { + LocatorInFlight.reset(); + IssueCommitBlobSeq(false); + } + } }; return new TPutQuery(*this, std::move(ev)); diff --git a/ydb/core/blob_depot/agent/ya.make b/ydb/core/blob_depot/agent/ya.make index 82195569e12..515da83e268 100644 --- a/ydb/core/blob_depot/agent/ya.make +++ b/ydb/core/blob_depot/agent/ya.make @@ -37,6 +37,7 @@ LIBRARY() ydb/core/blobstorage/vdisk/common ydb/core/blob_depot ydb/core/protos + ydb/core/wrappers ) END() diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp index 19cc2596242..78abeb4ca17 100644 --- a/ydb/core/blob_depot/blob_depot.cpp +++ b/ydb/core/blob_depot/blob_depot.cpp @@ -3,6 +3,7 @@ #include "blocks.h" #include "garbage_collection.h" #include "data.h" +#include "s3.h" #include "data_uncertain.h" #include "space_monitor.h" @@ -21,6 +22,7 @@ namespace NKikimr::NBlobDepot { , BlocksManager(new TBlocksManager(this)) , BarrierServer(new TBarrierServer(this)) , Data(new TData(this)) + , S3Manager(new TS3Manager(this)) , SpaceMonitor(new TSpaceMonitor(this)) , JsonHandler(std::bind(&TBlobDepot::RenderJson, this, std::placeholders::_1), TEvPrivate::EvJsonTimer, TEvPrivate::EvJsonUpdate) {} @@ -39,6 +41,7 @@ namespace NKikimr::NBlobDepot { hFunc(TEvBlobDepot::TEvQueryBlocks, BlocksManager->Handle); hFunc(TEvBlobDepot::TEvCollectGarbage, BarrierServer->Handle); hFunc(TEvBlobDepot::TEvPushNotifyResult, Handle); + hFunc(TEvBlobDepot::TEvPrepareWriteS3, Handle); default: Y_ABORT(); @@ -110,6 +113,7 @@ namespace NKikimr::NBlobDepot { fFunc(TEvBlobDepot::EvQueryBlocks, handleFromAgentPipe); fFunc(TEvBlobDepot::EvPushNotifyResult, handleFromAgentPipe); fFunc(TEvBlobDepot::EvCollectGarbage, handleFromAgentPipe); + fFunc(TEvBlobDepot::EvPrepareWriteS3, handleFromAgentPipe); fFunc(TEvPrivate::EvDeliver, handleDelivery); @@ -132,6 +136,10 @@ namespace NKikimr::NBlobDepot { cFunc(TEvPrivate::EvJsonTimer, JsonHandler.HandleTimer); cFunc(TEvPrivate::EvJsonUpdate, JsonHandler.HandleUpdate); + fFunc(TEvPrivate::EvUploadResult, S3Manager->Handle); + fFunc(TEvPrivate::EvDeleteResult, S3Manager->Handle); + fFunc(TEvPrivate::EvScanFound, S3Manager->Handle); + default: if (!HandleDefaultEvents(ev, SelfId())) { Y_ABORT("unexpected event Type# 0x%08" PRIx32, type); @@ -144,12 +152,14 @@ namespace NKikimr::NBlobDepot { } void TBlobDepot::PassAway() { - for (const TActorId& actorId : {GroupAssimilatorId}) { + for (const TActorId& actorId : {GroupAssimilatorId, S3Manager->GetWrapperId()}) { if (actorId) { TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, SelfId(), nullptr, 0)); } } + S3Manager->TerminateAllActors(); + TActor::PassAway(); } diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index 96aff19f075..b881a588dde 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -34,6 +34,10 @@ namespace NKikimr::NBlobDepot { EvDeliver, EvJsonTimer, EvJsonUpdate, + EvUploadResult, + EvDeleteResult, + EvScanFound, + EvScanContinue, }; }; @@ -77,6 +81,8 @@ namespace NKikimr::NBlobDepot { NKikimrBlobStorage::TPDiskSpaceColor::E LastPushedSpaceColor = {}; float LastPushedApproximateFreeSpaceShare = 0.0f; + + THashSet<TS3Locator> S3WritesInFlight; }; struct TPipeServerContext { @@ -179,6 +185,7 @@ namespace NKikimr::NBlobDepot { KickSpaceMonitor(); StartDataLoad(); UpdateThroughputs(); + InitS3Manager(); } void StartDataLoad(); @@ -269,6 +276,15 @@ namespace NKikimr::NBlobDepot { void Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev); void Handle(TEvBlobDepot::TEvDiscardSpoiledBlobSeq::TPtr ev); + void Handle(TEvBlobDepot::TEvPrepareWriteS3::TPtr ev); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // S3 operations + + class TS3Manager; + std::unique_ptr<TS3Manager> S3Manager; + + void InitS3Manager(); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Space monitoring diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index cd160a07dc3..7abb5c522b8 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -1,6 +1,7 @@ #include "data.h" #include "data_uncertain.h" #include "garbage_collection.h" +#include "s3.h" namespace NKikimr::NBlobDepot { @@ -22,7 +23,7 @@ namespace NKikimr::NBlobDepot { bool TData::TValue::Validate(const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item) { if (!item.HasBlobLocator()) { - return false; + return true; } const auto& locator = item.GetBlobLocator(); return locator.HasGroupId() && @@ -42,28 +43,32 @@ namespace NKikimr::NBlobDepot { return false; } - Y_DEBUG_ABORT_UNLESS(chain.HasLocator()); - const auto& locator1 = chain.GetLocator(); - Y_DEBUG_ABORT_UNLESS(locator1.HasGroupId() && locator1.HasBlobSeqId() && locator1.HasTotalDataLen()); + if (chain.HasBlobLocator() != item.HasBlobLocator()) { + return false; + } else if (chain.HasBlobLocator() && item.HasBlobLocator()) { + Y_DEBUG_ABORT_UNLESS(chain.HasBlobLocator()); + const auto& locator1 = chain.GetBlobLocator(); + Y_DEBUG_ABORT_UNLESS(locator1.HasGroupId() && locator1.HasBlobSeqId() && locator1.HasTotalDataLen()); - Y_DEBUG_ABORT_UNLESS(item.HasBlobLocator()); - const auto& locator2 = item.GetBlobLocator(); - Y_DEBUG_ABORT_UNLESS(locator2.HasGroupId() && locator2.HasBlobSeqId() && locator2.HasTotalDataLen()); + Y_DEBUG_ABORT_UNLESS(item.HasBlobLocator()); + const auto& locator2 = item.GetBlobLocator(); + Y_DEBUG_ABORT_UNLESS(locator2.HasGroupId() && locator2.HasBlobSeqId() && locator2.HasTotalDataLen()); #define COMPARE_FIELD(NAME) \ - if (locator1.Has##NAME() != locator2.Has##NAME()) { \ - return false; \ - } else if (locator1.Has##NAME() && locator1.Get##NAME() != locator2.Get##NAME()) { \ - return false; \ - } - COMPARE_FIELD(GroupId) - COMPARE_FIELD(Checksum) - COMPARE_FIELD(TotalDataLen) - COMPARE_FIELD(FooterLen) + if (locator1.Has##NAME() != locator2.Has##NAME()) { \ + return false; \ + } else if (locator1.Has##NAME() && locator1.Get##NAME() != locator2.Get##NAME()) { \ + return false; \ + } + COMPARE_FIELD(GroupId) + COMPARE_FIELD(Checksum) + COMPARE_FIELD(TotalDataLen) + COMPARE_FIELD(FooterLen) #undef COMPARE_FIELD - if (TBlobSeqId::FromProto(locator1.GetBlobSeqId()) != TBlobSeqId::FromProto(locator2.GetBlobSeqId())) { - return false; + if (TBlobSeqId::FromProto(locator1.GetBlobSeqId()) != TBlobSeqId::FromProto(locator2.GetBlobSeqId())) { + return false; + } } return true; @@ -87,7 +92,9 @@ namespace NKikimr::NBlobDepot { { auto& [key, value] = *it; - std::vector<TLogoBlobID> deleteQ; + std::vector<TLogoBlobID> deleteBlobs; + std::vector<TS3Locator> deleteS3; + const bool uncertainWriteBefore = value.UncertainWrite; const bool wasUncertain = value.IsWrittenUncertainly(); const bool wasGoingToAssimilate = value.GoingToAssimilate; @@ -99,12 +106,16 @@ namespace NKikimr::NBlobDepot { #endif if (!inserted) { - EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) { - const auto it = RefCount.find(id); - Y_ABORT_UNLESS(it != RefCount.end()); - if (!--it->second) { - deleteQ.push_back(id); + auto dropRefCount = [&](auto& map, auto&& key, auto& deleteQ) { + if (ui32& rc = --map[key]; !rc) { + deleteQ.push_back(std::move(key)); + } else { + Y_ABORT_UNLESS(rc != Max<ui32>()); } + }; + EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), TOverloaded{ + [&](TLogoBlobID id, ui32, ui32) { dropRefCount(RefCountBlobs, id, deleteBlobs); }, + [&](TS3Locator locator) { dropRefCount(RefCountS3, locator, deleteS3); } }); } @@ -130,49 +141,101 @@ namespace NKikimr::NBlobDepot { (Outcome, outcomeToString()), (UnderSoft, underSoft), (Inserted, inserted), (Value, value), (UncertainWriteBefore, uncertainWriteBefore)); - EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) { - const auto [it, inserted] = RefCount.try_emplace(id); + auto returnRefCount = [&](auto& map, auto&& key, auto& deleteQ) { + const auto [it, inserted] = map.try_emplace(std::move(key)); + if (outcome != EUpdateOutcome::DROP) { + ++it->second; + } else if (inserted) { + deleteQ.push_back(it->first); + } if (inserted) { - Y_ABORT_UNLESS(!CanBeCollected(TBlobSeqId::FromLogoBlobId(id))); - Y_VERIFY_DEBUG_S(id.Generation() == generation, "BlobId# " << id << " Generation# " << generation); - Y_VERIFY_DEBUG_S(Self->Channels[id.Channel()].GetLeastExpectedBlobId(generation) <= TBlobSeqId::FromLogoBlobId(id), - "LeastExpectedBlobId# " << Self->Channels[id.Channel()].GetLeastExpectedBlobId(generation) - << " Id# " << id - << " Generation# " << generation); - AddFirstMentionedBlob(id); + AddFirstMentionedBlob(it->first); } - if (outcome == EUpdateOutcome::DROP) { - if (inserted) { - deleteQ.push_back(id); + return inserted; + }; + + EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), TOverloaded{ + [&](TLogoBlobID id, ui32, ui32) { + if (returnRefCount(RefCountBlobs, id, deleteBlobs)) { + auto makeValueChain = [&] { + TStringStream s; + s << '{'; + for (bool first = true; const auto& item : value.ValueChain) { + s << (std::exchange(first, false) ? "" : " ") << SingleLineProto(item); + } + s << '}'; + return s.Str(); + }; + Y_VERIFY_S(!CanBeCollected(TBlobSeqId::FromLogoBlobId(id)), "BlobId# " << id + << " ValueChain# " << makeValueChain()); + Y_VERIFY_DEBUG_S(id.Generation() == generation, "BlobId# " << id << " Generation# " << generation); + Y_VERIFY_DEBUG_S(Self->Channels[id.Channel()].GetLeastExpectedBlobId(generation) <= TBlobSeqId::FromLogoBlobId(id), + "LeastExpectedBlobId# " << Self->Channels[id.Channel()].GetLeastExpectedBlobId(generation) + << " Id# " << id + << " Generation# " << generation); } - } else { - ++it->second; + }, + [&](TS3Locator locator) { + returnRefCount(RefCountS3, locator, deleteS3); } }); - auto filter = [&](const TLogoBlobID& id) { - const auto it = RefCount.find(id); - Y_ABORT_UNLESS(it != RefCount.end()); + NIceDb::TNiceDb db(txc.DB); + + auto filterBlobs = [&](TLogoBlobID id) { + const auto it = RefCountBlobs.find(id); + Y_ABORT_UNLESS(it != RefCountBlobs.end()); if (it->second) { - return true; // remove this blob from deletion queue, it still has references - } else { - InFlightTrash.emplace(cookie, id); - InFlightTrashSize += id.BlobSize(); - Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_IN_FLIGHT_TRASH_SIZE] = InFlightTrashSize; - NIceDb::TNiceDb(txc.DB).Table<Schema::Trash>().Key(id.AsBinaryString()).Update(); - RefCount.erase(it); - TotalStoredDataSize -= id.BlobSize(); - Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_STORED_DATA_SIZE] = TotalStoredDataSize; - return false; // keep this blob in deletion queue + return true; } + RefCountBlobs.erase(it); + + const size_t size = id.BlobSize(); + InFlightTrashSize += size; + Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_IN_FLIGHT_TRASH_SIZE] = InFlightTrashSize; + TotalStoredDataSize -= size; + Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_STORED_DATA_SIZE] = TotalStoredDataSize; + + InFlightTrashBlobs.emplace(cookie, id); + db.Table<Schema::Trash>().Key(id.AsBinaryString()).Update(); + return false; // keep this blob in deletion queue }; - std::sort(deleteQ.begin(), deleteQ.end()); - deleteQ.erase(std::unique(deleteQ.begin(), deleteQ.end()), deleteQ.end()); - deleteQ.erase(std::remove_if(deleteQ.begin(), deleteQ.end(), filter), deleteQ.end()); - if (!deleteQ.empty()) { - UncertaintyResolver->DropBlobs(deleteQ); + + auto filterS3 = [&](TS3Locator locator) { + const auto it = RefCountS3.find(locator); + Y_ABORT_UNLESS(it != RefCountS3.end()); + if (it->second) { + return true; + } + + RefCountS3.erase(it); + TotalS3DataSize -= locator.Len; + + AddToS3Trash(locator, txc, cookie); + return false; // keep this blob in deletion queue + }; + + auto process = [&](auto& deleteQ, auto&& filterFunc) { + std::ranges::sort(deleteQ); + if (auto [first, last] = std::ranges::unique(deleteQ); first != last) { + deleteQ.erase(first, last); + } + if (auto [first, last] = std::ranges::remove_if(deleteQ, filterFunc); first != last) { + deleteQ.erase(first, last); + } + }; + + process(deleteBlobs, filterBlobs); + process(deleteS3, filterS3); + + if (!deleteBlobs.empty()) { + UncertaintyResolver->DropBlobs(deleteBlobs); } + auto& counters = Self->TabletCounters->Simple(); + counters[NKikimrBlobDepot::COUNTER_TOTAL_S3_DATA_OBJECTS] = RefCountS3.size(); + counters[NKikimrBlobDepot::COUNTER_TOTAL_S3_DATA_SIZE] = TotalS3DataSize; + auto row = NIceDb::TNiceDb(txc.DB).Table<Schema::Data>().Key(key.MakeBinaryKey()); switch (outcome) { case EUpdateOutcome::DROP: @@ -196,6 +259,7 @@ namespace NKikimr::NBlobDepot { } if (wasUncertain && !value.IsWrittenUncertainly()) { UncertaintyResolver->MakeKeyCertain(key); + Self->S3Manager->OnKeyWritten(key, value.ValueChain); } if (wasGoingToAssimilate != value.GoingToAssimilate) { const i64 sign = value.GoingToAssimilate - wasGoingToAssimilate; @@ -223,15 +287,7 @@ namespace NKikimr::NBlobDepot { Y_ABORT_UNLESS(IsKeyLoaded(key)); UpdateKey(key, txc, cookie, "UpdateKey", [&](TValue& value, bool inserted) { if (!inserted) { // update value items - value.Meta = item.GetMeta(); - value.Public = false; - value.UncertainWrite = item.GetUncertainWrite(); - - // update it to keep new blob locator - value.ValueChain.Clear(); - auto *chain = value.ValueChain.Add(); - auto *locator = chain->MutableLocator(); - locator->CopyFrom(item.GetBlobLocator()); + value.UpdateFrom(item); ++value.ValueVersion; // clear assimilation flag -- we have blob overwritten with fresh copy (of the same data) @@ -257,7 +313,7 @@ namespace NKikimr::NBlobDepot { } if (value.ValueChain.empty()) { auto *chain = value.ValueChain.Add(); - auto *locator = chain->MutableLocator(); + auto *locator = chain->MutableBlobLocator(); locator->SetGroupId(Self->Info()->GroupFor(blobSeqId.Channel, blobSeqId.Generation)); blobSeqId.ToProto(locator->MutableBlobSeqId()); locator->SetTotalDataLen(key.GetBlobId().BlobSize()); @@ -270,6 +326,14 @@ namespace NKikimr::NBlobDepot { }); } + void TData::AddToS3Trash(TS3Locator locator, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { + NIceDb::TNiceDb(txc.DB).Table<Schema::TrashS3>().Key(locator.Generation, locator.KeyId) + .Update<Schema::TrashS3::Len>(locator.Len); + InFlightTrashSize += locator.Len; + Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_IN_FLIGHT_TRASH_SIZE] = InFlightTrashSize; + InFlightTrashS3.emplace(cookie, locator); + } + void TData::MakeKeyCertain(const TKey& key) { const auto it = Data.find(key); Y_ABORT_UNLESS(it != Data.end()); @@ -349,10 +413,15 @@ namespace NKikimr::NBlobDepot { // we can only add key that is not loaded before; if key exists, it MUST have been loaded from the dataset const auto [it, inserted] = Data.try_emplace(std::move(key), std::move(proto), uncertainWrite); Y_ABORT_UNLESS(inserted); - EnumerateBlobsForValueChain(it->second.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) { - if (!RefCount[id]++) { - AddFirstMentionedBlob(id); + + auto addRefCount = [&](auto& map, auto&& key) { + if (!map[key]++) { + AddFirstMentionedBlob(key); } + }; + EnumerateBlobsForValueChain(it->second.ValueChain, Self->TabletID(), TOverloaded{ + [&](TLogoBlobID id, ui32, ui32) { addRefCount(RefCountBlobs, id); }, + [&](TS3Locator locator) { addRefCount(RefCountS3, locator); } }); if (it->second.GoingToAssimilate) { Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_BYTES_TO_DECOMMIT] += it->first.GetBlobId().BlobSize(); @@ -536,6 +605,12 @@ namespace NKikimr::NBlobDepot { Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_STORED_DATA_SIZE] = TotalStoredDataSize; } + void TData::AddFirstMentionedBlob(TS3Locator locator) { + auto& counters = Self->TabletCounters->Simple(); + counters[NKikimrBlobDepot::COUNTER_TOTAL_S3_DATA_OBJECTS] = RefCountS3.size(); + counters[NKikimrBlobDepot::COUNTER_TOTAL_S3_DATA_SIZE] = TotalS3DataSize += locator.Len; + } + void TData::AccountBlob(TLogoBlobID id, bool add) { // account record STLOG(PRI_DEBUG, BLOB_DEPOT, BDT81, "AccountBlob", (Id, Self->GetLogId()), (BlobId, id), (Add, add)); @@ -687,21 +762,33 @@ namespace NKikimr::NBlobDepot { LastRecordsValidationTimestamp = now; TTabletStorageInfo *info = Self->Info(); - THashMap<TLogoBlobID, ui32> refcounts; + THashMap<TLogoBlobID, ui32> refcountBlobs; + THashMap<TS3Locator, ui32> refcountS3; for (const auto& [key, value] : Data) { - EnumerateBlobsForValueChain(value.ValueChain, info->TabletID, [&](TLogoBlobID id, ui32, ui32) { - ++refcounts[id]; + EnumerateBlobsForValueChain(value.ValueChain, info->TabletID, TOverloaded{ + [&](TLogoBlobID id, ui32, ui32) { + ++refcountBlobs[id]; + }, + [&](TS3Locator locator) { + ++refcountS3[locator]; + } }); } - Y_ABORT_UNLESS(RefCount == refcounts); + Y_ABORT_UNLESS(RefCountBlobs == refcountBlobs); + Y_ABORT_UNLESS(RefCountS3 == refcountS3); - for (const auto& [cookie, id] : InFlightTrash) { - const bool inserted = refcounts.try_emplace(id).second; + for (const auto& [cookie, id] : InFlightTrashBlobs) { + const bool inserted = refcountBlobs.try_emplace(id).second; + Y_ABORT_UNLESS(inserted); + } + + for (const auto& [cookie, locator] : InFlightTrashS3) { + const bool inserted = refcountS3.try_emplace(locator).second; Y_ABORT_UNLESS(inserted); } THashSet<std::tuple<ui8, ui32, TLogoBlobID>> used; - for (const auto& [id, count] : refcounts) { + for (const auto& [id, count] : refcountBlobs) { const ui32 groupId = info->GroupFor(id.Channel(), id.Generation()); used.emplace(id.Channel(), groupId, id); } @@ -716,6 +803,10 @@ namespace NKikimr::NBlobDepot { #endif } + bool TData::IsUseful(const TS3Locator& locator) const { + return RefCountS3.contains(locator); + } + } // NKikimr::NBlobDepot template<> diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index 11dd7f12dd2..19e157428ea 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -141,6 +141,18 @@ namespace NKikimr::NBlobDepot { } } + TString MakeTextualKey() const { + if (Data.Type == BlobIdType) { + return GetBlobId().ToString(); + } else if (Data.Type <= MaxInlineStringLen || Data.Type == StringType) { + return TString(GetStringBuf()); + } else if (Data.Type == MinType) { + return {}; + } else { + Y_ABORT(); + } + } + static TKey FromBinaryKey(const TString& key, const NKikimrBlobDepot::TBlobDepotConfig& config) { if (config.HasVirtualGroupId()) { return TKey(TLogoBlobID::FromBinary(key)); @@ -283,14 +295,8 @@ namespace NKikimr::NBlobDepot { , UncertainWrite(uncertainWrite) {} - explicit TValue(const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item) - : Meta(item.GetMeta()) - , Public(false) - , UncertainWrite(item.GetUncertainWrite()) - { - auto *chain = ValueChain.Add(); - auto *locator = chain->MutableLocator(); - locator->CopyFrom(item.GetBlobLocator()); + explicit TValue(const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item) { + UpdateFrom(item); } explicit TValue(EKeepState keepState) @@ -299,6 +305,21 @@ namespace NKikimr::NBlobDepot { , UncertainWrite(false) {} + void UpdateFrom(const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item) { + Meta = item.GetMeta(); + Public = false; + UncertainWrite = item.GetUncertainWrite(); + + ValueChain.Clear(); + auto *chain = ValueChain.Add(); + if (item.HasBlobLocator()) { + chain->MutableBlobLocator()->CopyFrom(item.GetBlobLocator()); + } + if (item.HasS3Locator()) { + chain->MutableS3Locator()->CopyFrom(item.GetS3Locator()); + } + } + bool IsWrittenUncertainly() const { return UncertainWrite && !ValueChain.empty(); } @@ -420,17 +441,20 @@ namespace NKikimr::NBlobDepot { bool Loaded = false; std::map<TKey, TValue> Data; TClosedIntervalSet<TKey> LoadedKeys; // keys that are already scanned and loaded in the local database - THashMap<TLogoBlobID, ui32> RefCount; + THashMap<TLogoBlobID, ui32> RefCountBlobs; + THashMap<TS3Locator, ui32> RefCountS3; THashMap<std::tuple<ui8, ui32>, TRecordsPerChannelGroup> RecordsPerChannelGroup; std::optional<TLogoBlobID> LastAssimilatedBlobId; THashSet<std::tuple<ui8, ui32>> AlreadyCutHistory; ui64 TotalStoredDataSize = 0; ui64 TotalStoredTrashSize = 0; ui64 InFlightTrashSize = 0; + ui64 TotalS3DataSize = 0; friend class TGroupAssimilator; - THashMultiMap<void*, TLogoBlobID> InFlightTrash; // being committed, but not yet confirmed + THashMultiMap<void*, TLogoBlobID> InFlightTrashBlobs; // being committed, but not yet confirmed + THashMultiMap<void*, TS3Locator> InFlightTrashS3; // being committed, but not yet confirmed class TTxIssueGC; class TTxConfirmGC; @@ -635,6 +659,8 @@ namespace NKikimr::NBlobDepot { void BindToBlob(const TKey& key, TBlobSeqId blobSeqId, bool keep, bool doNotKeep, NTabletFlatExecutor::TTransactionContext& txc, void *cookie); + void AddToS3Trash(TS3Locator locator, NTabletFlatExecutor::TTransactionContext& txc, void *cookie); + void MakeKeyCertain(const TKey& key); void HandleCommitCertainKeys(); @@ -662,6 +688,7 @@ namespace NKikimr::NBlobDepot { void TrimChannelHistory(ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted); void AddFirstMentionedBlob(TLogoBlobID id); + void AddFirstMentionedBlob(TS3Locator locator); void AccountBlob(TLogoBlobID id, bool add); bool CanBeCollected(TBlobSeqId id) const; @@ -670,7 +697,10 @@ namespace NKikimr::NBlobDepot { template<typename TCallback> void EnumerateRefCount(TCallback&& callback) { - for (const auto& [key, value] : RefCount) { + for (const auto& [key, value] : RefCountBlobs) { + callback(key, value); + } + for (const auto& [key, value] : RefCountS3) { callback(key, value); } } @@ -687,12 +717,20 @@ namespace NKikimr::NBlobDepot { void StartLoad(); bool LoadTrash(NTabletFlatExecutor::TTransactionContext& txc, TString& from, bool& progress); + bool LoadTrashS3(NTabletFlatExecutor::TTransactionContext& txc, TS3Locator& from, bool& progress); void OnLoadComplete(); bool IsLoaded() const { return Loaded; } bool IsKeyLoaded(const TKey& key) const { return Loaded || LoadedKeys[key]; } bool EnsureKeyLoaded(const TKey& key, NTabletFlatExecutor::TTransactionContext& txc); + template<typename TRecord> + bool LoadMissingKeys(const TRecord& record, NTabletFlatExecutor::TTransactionContext& txc); + + std::optional<TString> CheckKeyAgainstBarrier(const TKey& key); + + bool IsUseful(const TS3Locator& locator) const; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// class TResolveDecommitActor; diff --git a/ydb/core/blob_depot/data_gc.cpp b/ydb/core/blob_depot/data_gc.cpp index bca035fbcb9..25cb263c8fb 100644 --- a/ydb/core/blob_depot/data_gc.cpp +++ b/ydb/core/blob_depot/data_gc.cpp @@ -1,5 +1,6 @@ #include "data.h" #include "schema.h" +#include "garbage_collection.h" namespace NKikimr::NBlobDepot { @@ -141,4 +142,23 @@ namespace NKikimr::NBlobDepot { Self->Execute(std::make_unique<TTxHardGC>(Self, channel, groupId, hardGenStep)); } + std::optional<TString> TData::CheckKeyAgainstBarrier(const TKey& key) { + const auto& v = key.AsVariant(); + if (const auto *id = std::get_if<TLogoBlobID>(&v)) { + bool underSoft, underHard; + Self->BarrierServer->GetBlobBarrierRelation(*id, &underSoft, &underHard); + if (underHard) { + return TStringBuilder() << "under hard barrier# " << Self->BarrierServer->ToStringBarrier( + id->TabletID(), id->Channel(), true); + } else if (underSoft) { + const TData::TValue *value = Self->Data->FindKey(key); + if (!value || value->KeepState != NKikimrBlobDepot::EKeepState::Keep) { + return TStringBuilder() << "under soft barrier# " << Self->BarrierServer->ToStringBarrier( + id->TabletID(), id->Channel(), false); + } + } + } + return std::nullopt; + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp index 8cec6e3f570..113a869fd7c 100644 --- a/ydb/core/blob_depot/data_load.cpp +++ b/ydb/core/blob_depot/data_load.cpp @@ -2,6 +2,7 @@ #include "schema.h" #include "garbage_collection.h" #include "coro_tx.h" +#include "s3.h" namespace NKikimr::NBlobDepot { @@ -12,7 +13,7 @@ namespace NKikimr::NBlobDepot { bool progress = false; TString trash; - bool trashLoaded = false; + TS3Locator s3; TScanRange r{ .Begin = TKey::Min(), @@ -21,7 +22,8 @@ namespace NKikimr::NBlobDepot { .PrechargeBytes = 1'000'000, }; - while (!(trashLoaded = LoadTrash(*TCoroTx::GetTxc(), trash, progress)) || + while (!LoadTrash(*TCoroTx::GetTxc(), trash, progress) || + !LoadTrashS3(*TCoroTx::GetTxc(), s3, progress) || !ScanRange(r, TCoroTx::GetTxc(), &progress, [](const TKey&, const TValue&) { return true; })) { if (std::exchange(progress, false)) { TCoroTx::FinishTx(); @@ -61,6 +63,36 @@ namespace NKikimr::NBlobDepot { return true; } + bool TData::LoadTrashS3(NTabletFlatExecutor::TTransactionContext& txc, TS3Locator& from, bool& progress) { + NIceDb::TNiceDb db(txc.DB); + auto table = db.Table<Schema::TrashS3>().GreaterOrEqual(from.Generation, from.KeyId); + static constexpr ui64 PrechargeRows = 10'000; + static constexpr ui64 PrechargeBytes = 1'000'000; + if (!table.Precharge(PrechargeRows, PrechargeBytes)) { + return false; + } + auto rows = table.Select(); + if (!rows.IsReady()) { + return false; + } + while (rows.IsValid()) { + TS3Locator item{ + .Len = rows.GetValue<Schema::TrashS3::Len>(), + .Generation = rows.GetValue<Schema::TrashS3::Generation>(), + .KeyId = rows.GetValue<Schema::TrashS3::KeyId>(), + }; + if (item != from) { + Self->S3Manager->AddTrashToCollect(item); + from = item; + progress = true; + } + if (!rows.Next()) { + return false; + } + } + return true; + } + void TData::OnLoadComplete() { Self->Data->LoadedKeys([&](const TKey& left, const TKey& right) { // verify that LoadedKeys == {Min, Max} exactly @@ -111,6 +143,23 @@ namespace NKikimr::NBlobDepot { } } + template<typename TRecord> + bool TData::LoadMissingKeys(const TRecord& record, NTabletFlatExecutor::TTransactionContext& txc) { + if (IsLoaded()) { + return true; + } + for (const auto& item : record.GetItems()) { + auto key = TKey::FromBinaryKey(item.GetKey(), Self->Config); + if (!EnsureKeyLoaded(key, txc)) { + return false; + } + } + return true; + } + + template bool TData::LoadMissingKeys(const NKikimrBlobDepot::TEvCommitBlobSeq& record, NTabletFlatExecutor::TTransactionContext& txc); + template bool TData::LoadMissingKeys(const NKikimrBlobDepot::TEvPrepareWriteS3& record, NTabletFlatExecutor::TTransactionContext& txc); + void TBlobDepot::StartDataLoad() { Data->StartLoad(); } diff --git a/ydb/core/blob_depot/data_mon.cpp b/ydb/core/blob_depot/data_mon.cpp index 174ca8a53e6..351d6818f53 100644 --- a/ydb/core/blob_depot/data_mon.cpp +++ b/ydb/core/blob_depot/data_mon.cpp @@ -1,6 +1,7 @@ #include "data.h" #include "data_uncertain.h" #include "mon_main.h" +#include "s3.h" namespace NKikimr::NBlobDepot { @@ -16,9 +17,13 @@ namespace NKikimr::NBlobDepot { KEYVALUE_TABLE({ KEYVALUE_P("Loaded", Loaded ? "true" : "false"); KEYVALUE_P("Data size, number of keys", Data.size()); - KEYVALUE_P("RefCount size, number of blobs", RefCount.size()); + KEYVALUE_P("RefCount size, number of blobs", RefCountBlobs.size()); KEYVALUE_P("Total stored data size, bytes", FormatByteSize(TotalStoredDataSize)); KEYVALUE_P("Keys made certain, number of keys", KeysMadeCertain.size()); + KEYVALUE_P("Total number of useful S3 objects", RefCountS3.size()); + KEYVALUE_P("Total bytes in useful S3 objects", FormatByteSize(TotalS3DataSize)); + KEYVALUE_P("Total number of trash S3 objects", Self->S3Manager->GetTotalS3TrashObjects()); + KEYVALUE_P("Total bytes in trash S3 objects", FormatByteSize(Self->S3Manager->GetTotalS3TrashSize())); }) } } diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp index 0f743eda5cb..9dc71cd0a16 100644 --- a/ydb/core/blob_depot/data_resolve.cpp +++ b/ydb/core/blob_depot/data_resolve.cpp @@ -215,17 +215,23 @@ namespace NKikimr::NBlobDepot { LogoBlobIDFromLogoBlobID(key.GetBlobId(), out->MutableBlobId()); item.SetReliablyWritten(reliablyWritten); } else { - EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](const TLogoBlobID& id, ui32 begin, ui32 end) { - if (begin != end) { - auto *out = item.AddValueChain(); - out->SetGroupId(Self->Info()->GroupFor(id.Channel(), id.Generation())); - LogoBlobIDFromLogoBlobID(id, out->MutableBlobId()); - if (begin) { - out->SetSubrangeBegin(begin); - } - if (end != id.BlobSize()) { - out->SetSubrangeEnd(end); + EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), TOverloaded { + [&](TLogoBlobID id, ui32 begin, ui32 end) { + if (begin != end) { + auto *out = item.AddValueChain(); + out->SetGroupId(Self->Info()->GroupFor(id.Channel(), id.Generation())); + LogoBlobIDFromLogoBlobID(id, out->MutableBlobId()); + if (begin) { + out->SetSubrangeBegin(begin); + } + if (end != id.BlobSize()) { + out->SetSubrangeEnd(end); + } } + }, + [&](TS3Locator locator) { + auto *out = item.AddValueChain(); + locator.ToProto(out->MutableS3Locator()); } }); item.SetReliablyWritten(true); diff --git a/ydb/core/blob_depot/data_trash.cpp b/ydb/core/blob_depot/data_trash.cpp index 1fca22e9165..802653c35f6 100644 --- a/ydb/core/blob_depot/data_trash.cpp +++ b/ydb/core/blob_depot/data_trash.cpp @@ -1,11 +1,12 @@ #include "data.h" +#include "s3.h" namespace NKikimr::NBlobDepot { using TData = TBlobDepot::TData; void TData::CommitTrash(void *cookie) { - auto [first, last] = InFlightTrash.equal_range(cookie); + auto [first, last] = InFlightTrashBlobs.equal_range(cookie); std::unordered_set<TRecordsPerChannelGroup*> records; for (auto it = first; it != last; ++it) { auto& record = GetRecordsPerChannelGroup(it->second); @@ -13,12 +14,20 @@ namespace NKikimr::NBlobDepot { records.insert(&record); InFlightTrashSize -= it->second.BlobSize(); } - InFlightTrash.erase(first, last); - Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_IN_FLIGHT_TRASH_SIZE] = InFlightTrashSize; + InFlightTrashBlobs.erase(first, last); for (TRecordsPerChannelGroup *record : records) { record->CollectIfPossible(this); } + + auto [s3first, s3last] = InFlightTrashS3.equal_range(cookie); + for (auto it = s3first; it != s3last; ++it) { + Self->S3Manager->AddTrashToCollect(it->second); + InFlightTrashSize -= it->second.Len; + } + InFlightTrashS3.erase(s3first, s3last); + + Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_IN_FLIGHT_TRASH_SIZE] = InFlightTrashSize; } void TData::HandleTrash(TRecordsPerChannelGroup& record) { diff --git a/ydb/core/blob_depot/data_uncertain.cpp b/ydb/core/blob_depot/data_uncertain.cpp index 51fe983fffc..e5a8b5377de 100644 --- a/ydb/core/blob_depot/data_uncertain.cpp +++ b/ydb/core/blob_depot/data_uncertain.cpp @@ -29,8 +29,12 @@ namespace NKikimr::NBlobDepot { (Sender, entry->Result.GetSender()), (Cookie, entry->Result.GetCookie()), (Key, key)); // obtain list of blobs belonging to this key only once and here - EnumerateBlobsForValueChain(value->ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) { - keyContext.BlobState.emplace(id, std::make_tuple(EKeyBlobState::INITIAL, TString())); + EnumerateBlobsForValueChain(value->ValueChain, Self->TabletID(), TOverloaded{ + [&](TLogoBlobID id, ui32, ui32) { + keyContext.BlobState.emplace(id, std::make_tuple(EKeyBlobState::INITIAL, TString())); + }, + [&](TS3Locator) { + } }); // try to process the blobs diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h index aa001274f42..ca94809b648 100644 --- a/ydb/core/blob_depot/events.h +++ b/ydb/core/blob_depot/events.h @@ -28,6 +28,8 @@ namespace NKikimr { EvResolveResult, EvDiscardSpoiledBlobSeq, EvPushMetrics, + EvPrepareWriteS3, + EvPrepareWriteS3Result, }; #define BLOBDEPOT_PARAM_ARG(ARG) std::optional<std::decay_t<decltype(Record.Get##ARG())>> param##ARG, @@ -73,6 +75,8 @@ namespace NKikimr { BLOBDEPOT_EVENT_PB(EvResolveResult, Status, ErrorReason); BLOBDEPOT_EVENT_PB_NO_ARGS(EvDiscardSpoiledBlobSeq); BLOBDEPOT_EVENT_PB(EvPushMetrics, BytesRead, BytesWritten); + BLOBDEPOT_EVENT_PB_NO_ARGS(EvPrepareWriteS3); + BLOBDEPOT_EVENT_PB_NO_ARGS(EvPrepareWriteS3Result); template<typename TEvent> struct TResponseFor {}; @@ -85,6 +89,7 @@ namespace NKikimr { template<> struct TResponseFor<TEvCollectGarbage> { using Type = TEvCollectGarbageResult; }; template<> struct TResponseFor<TEvCommitBlobSeq> { using Type = TEvCommitBlobSeqResult; }; template<> struct TResponseFor<TEvResolve> { using Type = TEvResolveResult; }; + template<> struct TResponseFor<TEvPrepareWriteS3> { using Type = TEvPrepareWriteS3Result; }; template<typename TRequestEvent, typename... TArgs> static auto MakeResponseFor(TEventHandle<TRequestEvent>& ev, TArgs&&... args) { @@ -106,6 +111,7 @@ namespace NKikimr { template<> struct TEventFor<NKikimrBlobDepot::TEvResolve> { using Type = TEvResolve; }; template<> struct TEventFor<NKikimrBlobDepot::TEvCommitBlobSeq> { using Type = TEvCommitBlobSeq; }; template<> struct TEventFor<NKikimrBlobDepot::TEvDiscardSpoiledBlobSeq> { using Type = TEvDiscardSpoiledBlobSeq; }; + template<> struct TEventFor<NKikimrBlobDepot::TEvPrepareWriteS3> { using Type = TEvPrepareWriteS3; }; }; } // NKikimr diff --git a/ydb/core/blob_depot/mon_main.cpp b/ydb/core/blob_depot/mon_main.cpp index 0ea803bd861..6da0b21ea66 100644 --- a/ydb/core/blob_depot/mon_main.cpp +++ b/ydb/core/blob_depot/mon_main.cpp @@ -4,6 +4,7 @@ #include "blocks.h" #include "space_monitor.h" #include "mon_main.h" +#include "s3.h" namespace NKikimr::NBlobDepot { @@ -199,14 +200,18 @@ namespace NKikimr::NBlobDepot { key.Output(Stream); } TABLED() { - bool first = true; - for (const auto& item : value.ValueChain) { + for (bool first = true; const auto& item : value.ValueChain) { if (first) { first = false; } else { Stream << "<br/>"; } - Stream << TBlobSeqId::FromProto(item.GetLocator().GetBlobSeqId()).ToString(); + if (item.HasBlobLocator()) { + Stream << TBlobSeqId::FromProto(item.GetBlobLocator().GetBlobSeqId()).ToString(); + } + if (item.HasS3Locator()) { + Stream << TS3Locator::FromProto(item.GetS3Locator()).ToString(); + } if (item.HasSubrangeBegin() || item.HasSubrangeEnd()) { Stream << "["; if (item.HasSubrangeBegin()) { @@ -242,9 +247,9 @@ namespace NKikimr::NBlobDepot { TABLEH() { Stream << "blob id"; } TABLEH() { Stream << "refcount"; } } else { - Self->Data->EnumerateRefCount([&](TLogoBlobID id, ui32 count) { + Self->Data->EnumerateRefCount([&](const auto& id, ui32 count) { TABLER() { - TABLED() { Stream << id; } + TABLED() { Stream << id.ToString(); } TABLED() { Stream << count; } } }); @@ -320,13 +325,19 @@ namespace NKikimr::NBlobDepot { auto *info = Self->Info(); std::map<ui32, std::tuple<ui64, ui64>> space; - Self->Data->EnumerateRefCount([&](TLogoBlobID id, ui32 /*refCount*/) { - const ui32 groupId = info->GroupFor(id.Channel(), id.Generation()); - auto& [current, total] = space[groupId]; - total += id.BlobSize(); - if (id.Generation() == generation) { - current += id.BlobSize(); - } + Self->Data->EnumerateRefCount([&](const auto& key, ui32) { + TOverloaded callback{ + [&](TLogoBlobID id) { + const ui32 groupId = info->GroupFor(id.Channel(), id.Generation()); + auto& [current, total] = space[groupId]; + total += id.BlobSize(); + if (id.Generation() == generation) { + current += id.BlobSize(); + } + }, + [&](TS3Locator) {} + }; + callback(key); }); for (const auto& [groupId, _] : Self->SpaceMonitor->Groups) { @@ -479,8 +490,12 @@ document.addEventListener("DOMContentLoaded", ready); ui64 total = 0; ui64 trashInFlight = 0; ui64 trashPending = 0; - Data->EnumerateRefCount([&](TLogoBlobID id, ui32 /*refCount*/) { - total += id.BlobSize(); + Data->EnumerateRefCount([&](const auto& key, ui32) { + TOverloaded callback{ + [&](TLogoBlobID id) { total += id.BlobSize(); }, + [&](TS3Locator) {} + }; + callback(key); }); Data->EnumerateTrash([&](ui32 /*groupId*/, TLogoBlobID id, bool inFlight) { (inFlight ? trashInFlight : trashPending) += id.BlobSize(); @@ -567,8 +582,12 @@ document.addEventListener("DOMContentLoaded", ready); ui64 total = 0; ui64 trashInFlight = 0; ui64 trashPending = 0; - Data->EnumerateRefCount([&](TLogoBlobID id, ui32 /*refCount*/) { - total += id.BlobSize(); + Data->EnumerateRefCount([&](const auto& key, ui32) { + TOverloaded callback{ + [&](TLogoBlobID id) { total += id.BlobSize(); }, + [&](TS3Locator) {} + }; + callback(key); }); Data->EnumerateTrash([&](ui32 /*groupId*/, TLogoBlobID id, bool inFlight) { (inFlight ? trashInFlight : trashPending) += id.BlobSize(); diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp index e7aa4f6ec1f..be78a554e7f 100644 --- a/ydb/core/blob_depot/op_commit_blob_seq.cpp +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -1,8 +1,8 @@ #include "blob_depot_tablet.h" #include "schema.h" #include "data.h" -#include "garbage_collection.h" #include "blocks.h" +#include "s3.h" namespace NKikimr::NBlobDepot { @@ -30,7 +30,10 @@ namespace NKikimr::NBlobDepot { const auto& items = Request->Get()->Record.GetItems(); Self->TabletCounters->Cumulative()[NKikimrBlobDepot::COUNTER_PUTS_INCOMING] += items.size(); for (const auto& item : items) { - if (TData::TValue::Validate(item) && !item.GetCommitNotify()) { + if (!TData::TValue::Validate(item)) { + continue; + } + if (!item.GetCommitNotify() && item.HasBlobLocator()) { const auto blobSeqId = TBlobSeqId::FromProto(item.GetBlobLocator().GetBlobSeqId()); if (Self->Data->CanBeCollected(blobSeqId)) { // check for internal sanity -- we can't issue barriers on given ids without confirmed trimming @@ -54,12 +57,10 @@ namespace NKikimr::NBlobDepot { return true; } - if (!LoadMissingKeys(txc)) { + if (!Self->Data->LoadMissingKeys(Request->Get()->Record, txc)) { return false; } - NIceDb::TNiceDb db(txc.DB); - NKikimrBlobDepot::TEvCommitBlobSeqResult *responseRecord; std::tie(Response, responseRecord) = TEvBlobDepot::MakeResponseFor(*Request); @@ -67,33 +68,70 @@ namespace NKikimr::NBlobDepot { for (const auto& item : Request->Get()->Record.GetItems()) { auto *responseItem = responseRecord->AddItems(); + + auto finishWithError = [&](NKikimrProto::EReplyStatus status, const TString& errorReason) { + responseItem->SetStatus(status); + responseItem->SetErrorReason(errorReason); + if (item.HasS3Locator()) { + const auto& locator = TS3Locator::FromProto(item.GetS3Locator()); + const size_t numErased = agent.S3WritesInFlight.erase(locator); + if (locator.Generation < generation) { + Y_ABORT_UNLESS(!numErased); + Self->Data->AddToS3Trash(locator, txc, this); + } else { + Y_ABORT_UNLESS(numErased == 1); + Self->S3Manager->AddTrashToCollect(locator); + } + } + }; + if (!TData::TValue::Validate(item)) { - responseItem->SetStatus(NKikimrProto::ERROR); - responseItem->SetErrorReason("TEvCommitBlobSeq item protobuf is not valid"); + finishWithError(NKikimrProto::ERROR, "TEvCommitBlobSeq item protobuf is not valid"); continue; } - const auto& blobLocator = item.GetBlobLocator(); - const auto blobSeqId = TBlobSeqId::FromProto(blobLocator.GetBlobSeqId()); - if (FailedBlobSeqIds.contains(blobSeqId)) { - responseItem->SetStatus(NKikimrProto::ERROR); - responseItem->SetErrorReason("couldn't start commit sequence for blob"); - continue; + bool canBeCollected = false; // can the just-written-blob be collected with GC logic? + + if (item.HasBlobLocator()) { + const auto& blobLocator = item.GetBlobLocator(); + + const auto blobSeqId = TBlobSeqId::FromProto(blobLocator.GetBlobSeqId()); + if (FailedBlobSeqIds.contains(blobSeqId)) { + finishWithError(NKikimrProto::ERROR, "couldn't start commit sequence for blob"); + continue; + } + + canBeCollected = Self->Data->CanBeCollected(blobSeqId); + + Y_VERIFY_DEBUG_S(canBeCollected || !CanBeCollectedBlobSeqIds.contains(blobSeqId), + "BlobSeqId# " << blobSeqId); } - responseItem->SetStatus(NKikimrProto::OK); + if (item.HasS3Locator()) { + const auto& locator = TS3Locator::FromProto(item.GetS3Locator()); + if (locator.Generation < generation) { + finishWithError(NKikimrProto::ERROR, "S3 locator is obsolete"); + continue; + } + } - const bool canBeCollected = Self->Data->CanBeCollected(blobSeqId); + responseItem->SetStatus(NKikimrProto::OK); auto key = TData::TKey::FromBinaryKey(item.GetKey(), Self->Config); if (!item.GetCommitNotify()) { + bool blocksPass = true; if (const auto& v = key.AsVariant(); const auto *id = std::get_if<TLogoBlobID>(&v)) { - if (!Self->BlocksManager->CheckBlock(id->TabletID(), id->Generation())) { - // FIXME(alexvru): ExtraBlockChecks? - responseItem->SetStatus(NKikimrProto::BLOCKED); - responseItem->SetErrorReason("block race detected"); - continue; + blocksPass = Self->BlocksManager->CheckBlock(id->TabletID(), id->Generation()); + } + for (const auto& extra : item.GetExtraBlockChecks()) { + if (!blocksPass) { + break; } + blocksPass = Self->BlocksManager->CheckBlock(extra.GetTabletId(), extra.GetGeneration()); + } + if (!blocksPass) { + finishWithError(NKikimrProto::BLOCKED, "block race detected"); + continue; } } @@ -102,41 +140,51 @@ namespace NKikimr::NBlobDepot { if (canBeCollected) { // we can't accept this record, because it is potentially under already issued barrier - responseItem->SetStatus(NKikimrProto::ERROR); - responseItem->SetErrorReason("generation race"); + finishWithError(NKikimrProto::ERROR, "generation race"); continue; } - Y_VERIFY_DEBUG_S(!CanBeCollectedBlobSeqIds.contains(blobSeqId), "BlobSeqId# " << blobSeqId); - - TString error; - if (!CheckKeyAgainstBarrier(key, &error)) { - responseItem->SetStatus(NKikimrProto::ERROR); - responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << key.ToString() - << " is being put beyond the barrier: " << error); + if (auto error = Self->Data->CheckKeyAgainstBarrier(key)) { + finishWithError(NKikimrProto::ERROR, TStringBuilder() << "BlobId# " << key.ToString() + << " is being put beyond the barrier: " << *error); continue; } if (item.GetCommitNotify()) { if (item.GetUncertainWrite()) { - responseItem->SetStatus(NKikimrProto::ERROR); - responseItem->SetErrorReason("UncertainWrite along with CommitNotify"); + finishWithError(NKikimrProto::ERROR, "UncertainWrite along with CommitNotify"); } else if (const TData::TValue *v = Self->Data->FindKey(key); v && v->SameValueChainAsIn(item)) { Self->Data->MakeKeyCertain(key); } else { // data race -- this value has been altered since it was previously written - responseItem->SetStatus(NKikimrProto::RACE); + finishWithError(NKikimrProto::RACE, "value has been altered since it was previously written"); } } else { - Y_VERIFY_DEBUG_S(AllowedBlobSeqIds.contains(blobSeqId), "BlobSeqId# " << blobSeqId); - Y_VERIFY_DEBUG_S( - Self->Channels[blobSeqId.Channel].GetLeastExpectedBlobId(generation) <= blobSeqId, - "BlobSeqId# " << blobSeqId - << " LeastExpectedBlobId# " << Self->Channels[blobSeqId.Channel].GetLeastExpectedBlobId(generation) - << " Generation# " << generation); - Y_VERIFY_DEBUG_S(blobSeqId.Generation == generation, "BlobSeqId# " << blobSeqId << " Generation# " << generation); - Y_VERIFY_DEBUG_S(Self->Channels[blobSeqId.Channel].SequenceNumbersInFlight.contains(blobSeqId.ToSequentialNumber()), - "BlobSeqId# " << blobSeqId); + if (item.HasBlobLocator()) { + const auto blobSeqId = TBlobSeqId::FromProto(item.GetBlobLocator().GetBlobSeqId()); + Y_VERIFY_DEBUG_S(AllowedBlobSeqIds.contains(blobSeqId), "BlobSeqId# " << blobSeqId); + Y_VERIFY_DEBUG_S( + Self->Channels[blobSeqId.Channel].GetLeastExpectedBlobId(generation) <= blobSeqId, + "BlobSeqId# " << blobSeqId + << " LeastExpectedBlobId# " << Self->Channels[blobSeqId.Channel].GetLeastExpectedBlobId(generation) + << " Generation# " << generation); + Y_VERIFY_DEBUG_S(blobSeqId.Generation == generation, "BlobSeqId# " << blobSeqId << " Generation# " << generation); + Y_VERIFY_DEBUG_S(Self->Channels[blobSeqId.Channel].SequenceNumbersInFlight.contains(blobSeqId.ToSequentialNumber()), + "BlobSeqId# " << blobSeqId); + } + if (item.HasS3Locator()) { + auto locator = TS3Locator::FromProto(item.GetS3Locator()); + + // remove written item from the trash + NIceDb::TNiceDb(txc.DB).Table<Schema::TrashS3>().Key(locator.Generation, locator.KeyId).Delete(); + + // remove item from agent's inflight + const size_t numErased = agent.S3WritesInFlight.erase(locator); + Y_ABORT_UNLESS(numErased == 1); + + Self->TabletCounters->Cumulative()[NKikimrBlobDepot::COUNTER_S3_PUTS_OK] += 1; + Self->TabletCounters->Cumulative()[NKikimrBlobDepot::COUNTER_S3_PUTS_BYTES] += locator.Len; + } Self->Data->UpdateKey(key, item, txc, this); } } @@ -152,41 +200,6 @@ namespace NKikimr::NBlobDepot { return true; } - bool LoadMissingKeys(TTransactionContext& txc) { - NIceDb::TNiceDb db(txc.DB); - if (Self->Data->IsLoaded()) { - return true; - } - for (const auto& item : Request->Get()->Record.GetItems()) { - auto key = TData::TKey::FromBinaryKey(item.GetKey(), Self->Config); - if (!Self->Data->EnsureKeyLoaded(key, txc)) { - return false; - } - } - return true; - } - - bool CheckKeyAgainstBarrier(const TData::TKey& key, TString *error) { - const auto& v = key.AsVariant(); - if (const auto *id = std::get_if<TLogoBlobID>(&v)) { - bool underSoft, underHard; - Self->BarrierServer->GetBlobBarrierRelation(*id, &underSoft, &underHard); - if (underHard) { - *error = TStringBuilder() << "under hard barrier# " << Self->BarrierServer->ToStringBarrier( - id->TabletID(), id->Channel(), true); - return false; - } else if (underSoft) { - const TData::TValue *value = Self->Data->FindKey(key); - if (!value || value->KeepState != NKikimrBlobDepot::EKeepState::Keep) { - *error = TStringBuilder() << "under soft barrier# " << Self->BarrierServer->ToStringBarrier( - id->TabletID(), id->Channel(), false); - return false; - } - } - } - return true; - } - void Complete(const TActorContext&) override { TAgent& agent = Self->GetAgent(NodeId); for (const TBlobSeqId blobSeqId : BlobSeqIds) { @@ -210,7 +223,9 @@ namespace NKikimr::NBlobDepot { // FIXME(alexvru): delete uncertain keys containing this BlobSeqId as they were never written - for (const auto& item : ev->Get()->Record.GetItems()) { + const auto& record = ev->Get()->Record; + + for (const auto& item : record.GetItems()) { const auto blobSeqId = TBlobSeqId::FromProto(item); if (blobSeqId.Generation == generation) { Y_ABORT_UNLESS(blobSeqId.Channel < Channels.size()); @@ -227,6 +242,14 @@ namespace NKikimr::NBlobDepot { } } } + + for (const auto& item : record.GetS3Locators()) { + if (const auto& locator = TS3Locator::FromProto(item); locator.Generation == generation) { + const size_t numErased = agent.S3WritesInFlight.erase(locator); + Y_ABORT_UNLESS(numErased == 1); + S3Manager->AddTrashToCollect(locator); + } + } } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/s3.cpp b/ydb/core/blob_depot/s3.cpp new file mode 100644 index 00000000000..72ea67b62ae --- /dev/null +++ b/ydb/core/blob_depot/s3.cpp @@ -0,0 +1,53 @@ +#include "s3.h" +#include <ydb/core/wrappers/s3_wrapper.h> + +namespace NKikimr::NBlobDepot { + + using TS3Manager = TBlobDepot::TS3Manager; + + TS3Manager::TS3Manager(TBlobDepot *self) + : Self(self) + {} + + TS3Manager::~TS3Manager() = default; + + void TS3Manager::Init(const NKikimrBlobDepot::TS3BackendSettings *settings) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDTS05, "Init", (Settings, settings)); + if (settings) { + ExternalStorageConfig = NWrappers::IExternalStorageConfig::Construct(settings->GetSettings()); + WrapperId = Self->Register(NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator())); + BasePath = TStringBuilder() << settings->GetSettings().GetObjectKeyPattern() << '/' << Self->Config.GetName(); + Bucket = settings->GetSettings().GetBucket(); + SyncMode = settings->HasSyncMode(); + AsyncMode = settings->HasAsyncMode(); + RunScannerActor(); + } else { + SyncMode = false; + AsyncMode = false; + } + } + + void TS3Manager::TerminateAllActors() { + for (TActorId actorId : ActiveUploaders) { + Self->Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, Self->SelfId(), nullptr, 0)); + } + for (TActorId actorId : ActiveDeleters) { + Self->Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, Self->SelfId(), nullptr, 0)); + } + if (ScannerActorId) { + Self->Send(new IEventHandle(TEvents::TSystem::Poison, 0, ScannerActorId, Self->SelfId(), nullptr, 0)); + } + } + + void TS3Manager::Handle(TAutoPtr<IEventHandle> ev) { + STRICT_STFUNC_BODY( + fFunc(TEvPrivate::EvDeleteResult, HandleDeleter) + fFunc(TEvPrivate::EvScanFound, HandleScanner) + ) + } + + void TBlobDepot::InitS3Manager() { + S3Manager->Init(Config.HasS3BackendSettings() ? &Config.GetS3BackendSettings() : nullptr); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/s3.h b/ydb/core/blob_depot/s3.h new file mode 100644 index 00000000000..9a18fb732dc --- /dev/null +++ b/ydb/core/blob_depot/s3.h @@ -0,0 +1,84 @@ +#pragma once + +#include "defs.h" +#include "blob_depot_tablet.h" +#include "data.h" + +#include <ydb/core/wrappers/abstract.h> + +namespace NKikimr::NBlobDepot { + + class TBlobDepot::TS3Manager { + using TEvExternalStorage = NWrappers::TEvExternalStorage; + + TBlobDepot* const Self; + NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig; + TActorId WrapperId; + TActorId UploaderId; + TString BasePath; + TString Bucket; + + bool SyncMode = false; + bool AsyncMode = false; + + ui64 NextKeyId = 1; + + THashSet<TActorId> ActiveUploaders; + + public: + TS3Manager(TBlobDepot *self); + ~TS3Manager(); + + void Init(const NKikimrBlobDepot::TS3BackendSettings *settings); + void TerminateAllActors(); + + void Handle(TAutoPtr<IEventHandle> ev); + + void OnKeyWritten(const TData::TKey& key, const TValueChain& valueChain); + + const TActorId& GetWrapperId() const { return WrapperId; } + + void AddTrashToCollect(TS3Locator locator); + + ui64 GetTotalS3TrashObjects() const { return TotalS3TrashObjects; } + ui64 GetTotalS3TrashSize() const { return TotalS3TrashSize; } + + private: /////////////////////////////////////////////////////////////////////////////////////////////////////////// + class TTxPrepareWriteS3; + friend class TBlobDepot; + + TS3Locator AllocateS3Locator(ui32 len); + + private: /////////////////////////////////////////////////////////////////////////////////////////////////////////// + class TUploaderActor; + struct TEvUploadResult; + + private: /////////////////////////////////////////////////////////////////////////////////////////////////////////// + class TScannerActor; + class TTxProcessScannedKeys; + struct TEvScanFound; + + TActorId ScannerActorId; + + void RunScannerActor(); + void HandleScanner(TAutoPtr<IEventHandle> ev); + + private: /////////////////////////////////////////////////////////////////////////////////////////////////////////// + class TDeleterActor; + class TTxDeleteTrashS3; + struct TEvDeleteResult; + + static constexpr ui32 MaxDeletesInFlight = 3; + static constexpr size_t MaxObjectsToDeleteAtOnce = 10; + + std::deque<TS3Locator> DeleteQueue; // items we are definitely going to delete (must be present in TrashS3) + THashSet<TActorId> ActiveDeleters; + ui32 NumDeleteTxInFlight = 0; + ui64 TotalS3TrashObjects = 0; + ui64 TotalS3TrashSize = 0; + + void RunDeletersIfNeeded(); + void HandleDeleter(TAutoPtr<IEventHandle> ev); + }; + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/s3_delete.cpp b/ydb/core/blob_depot/s3_delete.cpp new file mode 100644 index 00000000000..5825d20c715 --- /dev/null +++ b/ydb/core/blob_depot/s3_delete.cpp @@ -0,0 +1,225 @@ +#include "s3.h" + +namespace NKikimr::NBlobDepot { + + using TS3Manager = TBlobDepot::TS3Manager; + + struct TS3Manager::TEvDeleteResult : TEventLocal<TEvDeleteResult, TEvPrivate::EvDeleteResult> { + std::vector<TS3Locator> LocatorsOk; + std::vector<TS3Locator> LocatorsError; + + TEvDeleteResult(std::vector<TS3Locator>&& locatorsOk, std::vector<TS3Locator>&& locatorsError) + : LocatorsOk(std::move(locatorsOk)) + , LocatorsError(std::move(locatorsError)) + {} + }; + + class TS3Manager::TDeleterActor : public TActor<TDeleterActor> { + TActorId ParentId; + THashMap<TString, TS3Locator> Locators; + TString LogId; + + public: + TDeleterActor(TActorId parentId, THashMap<TString, TS3Locator> locators, TString logId) + : TActor(&TThis::StateFunc) + , ParentId(parentId) + , Locators(locators) + , LogId(std::move(logId)) + {} + + void Handle(TEvExternalStorage::TEvDeleteObjectResponse::TPtr ev) { + auto& msg = *ev->Get(); + if (msg.IsSuccess()) { + Finish(std::nullopt); + } else if (const auto& error = msg.GetError(); error.GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) { + Finish(std::nullopt); + } else { + Finish(error.GetMessage().c_str()); + } + } + + void Handle(TEvExternalStorage::TEvDeleteObjectsResponse::TPtr ev) { + auto& msg = *ev->Get(); + + std::vector<TS3Locator> locatorsOk; + std::vector<TS3Locator> locatorsError; + + if (msg.IsSuccess()) { + auto& result = msg.Result.GetResult(); + for (const Aws::S3::Model::DeletedObject& deleted : result.GetDeleted()) { + if (deleted.KeyHasBeenSet()) { + if (const auto it = Locators.find(deleted.GetKey().c_str()); it != Locators.end()) { + locatorsOk.push_back(it->second); + Locators.erase(it); + } else { + STLOG(PRI_WARN, BLOB_DEPOT, BDTS09, "key not found", (Id, LogId), + (Key, deleted.KeyHasBeenSet() ? std::make_optional<TString>(deleted.GetKey().c_str()) : std::nullopt)); + } + } else { + STLOG(PRI_WARN, BLOB_DEPOT, BDTS10, "key not set", (Id, LogId)); + } + } + for (const Aws::S3::Model::Error& error : result.GetErrors()) { + if (error.KeyHasBeenSet() && error.GetCode() == "NoSuchKey") { // this key has already been deleted + if (const auto it = Locators.find(error.GetKey().c_str()); it != Locators.end()) { + locatorsOk.push_back(it->second); + Locators.erase(it); + } + } else { + STLOG(PRI_WARN, BLOB_DEPOT, BDTS11, "failed to delete object from S3", (Id, LogId), + (Key, error.KeyHasBeenSet() ? std::make_optional<TString>(error.GetKey().c_str()) : std::nullopt), + (Error, error.GetMessage().c_str())); + } + } + } else { + STLOG(PRI_WARN, BLOB_DEPOT, BDTS12, "failed to delete object(s) from S3", (Id, LogId), + (Error, msg.GetError().GetMessage().c_str())); + } + + for (const auto& [key, locator] : Locators) { + locatorsError.push_back(locator); + STLOG(PRI_WARN, BLOB_DEPOT, BDTS08, "failed to delete object from S3", (Id, LogId), (Locator, locator)); + } + + Send(ParentId, new TEvDeleteResult(std::move(locatorsOk), std::move(locatorsError))); + PassAway(); + } + + void HandleUndelivered() { + Finish("event undelivered"); + } + + void Finish(std::optional<TString> error) { + if (error) { + STLOG(PRI_WARN, BLOB_DEPOT, BDTS03, "failed to delete object(s) from S3", (Id, LogId), (Locators, Locators), + (Error, error)); + } + std::vector<TS3Locator> locatorsOk; + std::vector<TS3Locator> locatorsError; + auto *target = error ? &locatorsError : &locatorsOk; + for (const auto& [key, locator] : Locators) { + target->push_back(locator); + } + Send(ParentId, new TEvDeleteResult(std::move(locatorsOk), std::move(locatorsError))); + PassAway(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvExternalStorage::TEvDeleteObjectResponse, Handle) + hFunc(TEvExternalStorage::TEvDeleteObjectsResponse, Handle) + cFunc(TEvents::TSystem::Undelivered, HandleUndelivered) + cFunc(TEvents::TSystem::Poison, PassAway) + ) + }; + + class TS3Manager::TTxDeleteTrashS3 : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + std::vector<TS3Locator> Locators; + + public: + TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_DELETE_TRASH_S3; } + + TTxDeleteTrashS3(TBlobDepot *self, std::vector<TS3Locator>&& locators) + : TTransactionBase(self) + , Locators(std::move(locators)) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + for (const TS3Locator& locator : Locators) { + NIceDb::TNiceDb(txc.DB).Table<Schema::TrashS3>().Key(locator.Generation, locator.KeyId).Delete(); + } + return true; + } + + void Complete(const TActorContext&) override { + STLOG(PRI_INFO, BLOB_DEPOT, BDTS04, "TTxDeleteTrashS3 complete", (Id, Self->GetLogId()), (Locators, Locators)); + + size_t len = 0; + for (const TS3Locator& locator : Locators) { + len += locator.Len; + } + + Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_OBJECTS] = + Self->S3Manager->TotalS3TrashObjects -= Locators.size(); + Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_SIZE] = + Self->S3Manager->TotalS3TrashSize -= len; + + --Self->S3Manager->NumDeleteTxInFlight; + + Self->S3Manager->RunDeletersIfNeeded(); + } + }; + + void TS3Manager::AddTrashToCollect(TS3Locator locator) { + STLOG(PRI_INFO, BLOB_DEPOT, BDTS06, "AddTrashToCollect", (Id, Self->GetLogId()), (Locator, locator)); + Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_OBJECTS] = ++TotalS3TrashObjects; + Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_SIZE] = TotalS3TrashSize += locator.Len; + DeleteQueue.push_back(locator); + RunDeletersIfNeeded(); + } + + void TS3Manager::RunDeletersIfNeeded() { + while (!DeleteQueue.empty() && NumDeleteTxInFlight + ActiveDeleters.size() < MaxDeletesInFlight) { + THashMap<TString, TS3Locator> locators; + + while (!DeleteQueue.empty() && locators.size() < MaxObjectsToDeleteAtOnce) { + const TS3Locator& locator = DeleteQueue.front(); + locators.emplace(locator.MakeObjectName(BasePath), locator); + DeleteQueue.pop_front(); + } + + const TActorId actorId = Self->Register(new TDeleterActor(Self->SelfId(), locators, Self->GetLogId())); + ActiveDeleters.insert(actorId); + + if (locators.size() == 1) { + TActivationContext::Send(new IEventHandle(WrapperId, actorId, + new TEvExternalStorage::TEvDeleteObjectRequest( + Aws::S3::Model::DeleteObjectRequest() + .WithBucket(Bucket) + .WithKey(locators.begin()->first) + ), + IEventHandle::FlagTrackDelivery + )); + } else { + auto del = Aws::S3::Model::Delete(); + for (const auto& [key, locator] : locators) { + del.AddObjects(Aws::S3::Model::ObjectIdentifier().WithKey(key)); + } + + TActivationContext::Send(new IEventHandle(WrapperId, actorId, + new TEvExternalStorage::TEvDeleteObjectsRequest(Aws::S3::Model::DeleteObjectsRequest() + .WithBucket(Bucket).WithDelete(std::move(del))), IEventHandle::FlagTrackDelivery)); + } + } + } + + void TS3Manager::HandleDeleter(TAutoPtr<IEventHandle> ev) { + STRICT_STFUNC_BODY( + hFunc(TEvDeleteResult, [&](TEvDeleteResult::TPtr ev) { + const size_t numErased = ActiveDeleters.erase(ev->Sender); + Y_ABORT_UNLESS(numErased == 1); + + auto& msg = *ev->Get(); + + Self->TabletCounters->Cumulative()[NKikimrBlobDepot::COUNTER_S3_DELETES_OK] += msg.LocatorsOk.size(); + size_t len = 0; + for (const TS3Locator& locator : msg.LocatorsOk) { + len += locator.Len; + } + Self->TabletCounters->Cumulative()[NKikimrBlobDepot::COUNTER_S3_DELETES_BYTES] += len; + + Self->TabletCounters->Cumulative()[NKikimrBlobDepot::COUNTER_S3_DELETES_ERROR] += msg.LocatorsError.size(); + + if (!msg.LocatorsOk.empty()) { + Self->Execute(std::make_unique<TTxDeleteTrashS3>(Self, std::move(msg.LocatorsOk))); + ++NumDeleteTxInFlight; + } + + if (!msg.LocatorsError.empty()) { + DeleteQueue.insert(DeleteQueue.end(), msg.LocatorsError.begin(), msg.LocatorsError.end()); + RunDeletersIfNeeded(); + } + }) + ) + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/s3_scan.cpp b/ydb/core/blob_depot/s3_scan.cpp new file mode 100644 index 00000000000..f4dd43f190a --- /dev/null +++ b/ydb/core/blob_depot/s3_scan.cpp @@ -0,0 +1,216 @@ +#include "s3.h" + +namespace NKikimr::NBlobDepot { + + using TS3Manager = TBlobDepot::TS3Manager; + + struct TS3Manager::TEvScanFound : TEventLocal<TEvScanFound, TEvPrivate::EvScanFound> { + std::vector<std::tuple<TString, ui64>> KeysWithoutPrefix; + bool IsFinal; + std::optional<TString> Error; + + TEvScanFound(std::vector<std::tuple<TString, ui64>>&& keysWithoutPrefix, bool isFinal, std::optional<TString>&& error) + : KeysWithoutPrefix(std::move(keysWithoutPrefix)) + , IsFinal(isFinal) + , Error(std::move(error)) + {} + }; + + class TS3Manager::TScannerActor : public TActorBootstrapped<TScannerActor> { + const TActorId ParentId; + const TActorId WrapperId; + TString Prefix; + const TString Bucket; + const TString LogId; + std::optional<TString> Marker; + + public: + TScannerActor(TActorId parentId, TActorId wrapperId, TString prefix, TString bucket, TString logId) + : ParentId(parentId) + , WrapperId(wrapperId) + , Prefix(std::move(prefix)) + , Bucket(std::move(bucket)) + , LogId(std::move(logId)) + { + Prefix += '/'; + } + + void Bootstrap() { + IssueNextRequest(); + Become(&TThis::StateFunc); + } + + void IssueNextRequest() { + auto request = Aws::S3::Model::ListObjectsRequest() + .WithBucket(Bucket) + .WithPrefix(Prefix) + ; + if (Marker) { + request.SetMarker(*Marker); + } + request.SetMaxKeys(100); + + STLOG(PRI_DEBUG, BLOB_DEPOT, BDTS18, "TScannerActor::IssueNextRequest", (Id, LogId), (Prefix, Prefix), + (Marker, Marker)); + + Send(WrapperId, new TEvExternalStorage::TEvListObjectsRequest(request), IEventHandle::FlagTrackDelivery); + } + + void Handle(TEvExternalStorage::TEvListObjectsResponse::TPtr ev) { + auto& msg = *ev->Get(); + if (!msg.IsSuccess()) { + FinishWithError(msg.GetError().GetMessage().c_str()); + } else { + const auto& result = msg.Result.GetResult(); + TString lastKey; + std::vector<std::tuple<TString, ui64>> keysWithoutPrefix; + + for (const auto& item : result.GetContents()) { + if (!item.KeyHasBeenSet()) { + return FinishWithError("invalid response: no key set in listing"); + } else if (!item.SizeHasBeenSet()) { + return FinishWithError(TStringBuilder() << "invalid response: no size for key " << item.GetKey()); + } + lastKey = item.GetKey(); + if (!lastKey.StartsWith(Prefix)) { + return FinishWithError("returned key does not start with specified prefix"); + } + keysWithoutPrefix.emplace_back(lastKey.substr(Prefix.length()), item.GetSize()); + } + + const bool isFinal = !result.GetIsTruncated(); + + Send(ParentId, new TEvScanFound(std::move(keysWithoutPrefix), isFinal, std::nullopt)); + + if (isFinal) { + PassAway(); + } else { + Marker.emplace(std::move(lastKey)); + } + } + } + + void HandleUndelivered() { + FinishWithError("event undelivered"); + } + + void FinishWithError(TString error) { + Send(ParentId, new TEvScanFound({}, true, std::move(error))); + PassAway(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvExternalStorage::TEvListObjectsResponse, Handle) + cFunc(TEvents::TSystem::Undelivered, HandleUndelivered) + cFunc(TEvPrivate::EvScanContinue, IssueNextRequest) + cFunc(TEvents::TSystem::Poison, PassAway) + ) + }; + + class TS3Manager::TTxProcessScannedKeys : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + THashSet<TS3Locator> UnprocessedLocators; + THashSet<TS3Locator> LocatorsToDelete; + + public: + TTxProcessScannedKeys(TBlobDepot *self, THashSet<TS3Locator>&& locators) + : TTransactionBase(self) + , UnprocessedLocators(std::move(locators)) + {} + + TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_PROCESS_SCANNED_KEYS; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + NIceDb::TNiceDb db(txc.DB); + + for (auto it = UnprocessedLocators.begin(); it != UnprocessedLocators.end(); ) { + auto row = db.Table<Schema::TrashS3>().Key(it->Generation, it->KeyId).Select(); + if (!row.IsReady()) { + return false; + } else if (!row.IsValid()) { + const bool useful = Self->Data->IsUseful(*it); + if (useful) { + STLOG(PRI_CRIT, BLOB_DEPOT, BDTS13, "trying to delete useful S3 locator", (Id, Self->GetLogId()), + (Locator, *it)); + Y_DEBUG_ABORT("trying to delete useful S3 locator"); + } else { + LocatorsToDelete.insert(*it); + } + } + UnprocessedLocators.erase(it++); + } + + Y_ABORT_UNLESS(UnprocessedLocators.empty()); + + for (const TS3Locator& locator : LocatorsToDelete) { + db.Table<Schema::TrashS3>().Key(locator.Generation, locator.KeyId).Update<Schema::TrashS3::Len>(locator.Len); + } + + return true; + } + + void Complete(const TActorContext&) override { + for (const auto& locator : LocatorsToDelete) { + Self->S3Manager->AddTrashToCollect(locator); + } + if (const auto& actorId = Self->S3Manager->ScannerActorId) { + TActivationContext::Send(new IEventHandle(TEvPrivate::EvScanContinue, 0, actorId, Self->SelfId(), nullptr, 0)); + } + } + }; + + void TS3Manager::RunScannerActor() { + Y_ABORT_UNLESS(!ScannerActorId); + ScannerActorId = Self->Register(new TScannerActor(Self->SelfId(), WrapperId, BasePath, Bucket, Self->GetLogId())); + } + + void TS3Manager::HandleScanner(TAutoPtr<IEventHandle> ev) { + STRICT_STFUNC_BODY( + hFunc(TEvScanFound, [&](TEvScanFound::TPtr ev) { + auto& msg = *ev->Get(); + + STLOG(PRI_DEBUG, BLOB_DEPOT, BDTS17, "TEvScanFound received", (Id, Self->GetLogId()), + (IsFinal, msg.IsFinal), (Error, msg.Error), (KeysWithoutPrefix.size, msg.KeysWithoutPrefix.size())); + + Y_ABORT_UNLESS(ScannerActorId); + Y_ABORT_UNLESS(ev->Sender == ScannerActorId); + + if (msg.Error) { + STLOG(PRI_WARN, BLOB_DEPOT, BDTS14, "scanner error", (Id, Self->GetLogId()), (Error, msg.Error)); + // TODO(alexvru): restart scanner in some time + } + + THashSet<TS3Locator> trash; + + const ui32 generation = Self->Executor()->Generation(); + + for (const auto& [key, len] : msg.KeysWithoutPrefix) { + TString error; + if (const auto& locator = TS3Locator::FromObjectName(key, len, &error)) { + const bool useful = Self->Data->IsUseful(*locator); + const bool allow = locator->Generation < generation; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDTS15, "TEvScanFound: found key", (Id, Self->GetLogId()), + (Locator, *locator), (Useful, useful), (Allow, allow), (Error, error)); + if (!useful && allow) { + trash.insert(*locator); + } + } else { + STLOG(PRI_WARN, BLOB_DEPOT, BDTS16, "TEvScanFound: incorrect key name", (Id, Self->GetLogId()), + (Key, key), (Len, len)); + } + } + + if (msg.IsFinal) { + ScannerActorId = {}; + } + + if (!trash.empty()) { + Self->Execute(std::make_unique<TTxProcessScannedKeys>(Self, std::move(trash))); + } else if (!msg.IsFinal) { + TActivationContext::Send(new IEventHandle(TEvPrivate::EvScanContinue, 0, ev->Sender, ev->Recipient, + nullptr, 0)); + } + }) + ) + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/s3_upload.cpp b/ydb/core/blob_depot/s3_upload.cpp new file mode 100644 index 00000000000..9877744cf68 --- /dev/null +++ b/ydb/core/blob_depot/s3_upload.cpp @@ -0,0 +1,95 @@ +#include "s3.h" + +namespace NKikimr::NBlobDepot { + + using TS3Manager = TBlobDepot::TS3Manager; + + struct TS3Manager::TEvUploadResult : TEventLocal<TEvUploadResult, TEvPrivate::EvUploadResult> { + }; + + class TS3Manager::TUploaderActor : public TActorBootstrapped<TUploaderActor> { + const TActorId WrapperId; + const TString BasePath; + const TString Bucket; + const TIntrusivePtr<TTabletStorageInfo> Info; + const TData::TKey Key; + const TValueChain ValueChain; + ui32 RepliesPending = 0; + TString Buffer; + + public: + TUploaderActor(TActorId wrapperId, TString basePath, TString bucket, TIntrusivePtr<TTabletStorageInfo> info, + TData::TKey key, TValueChain valueChain) + : WrapperId(wrapperId) + , BasePath(std::move(basePath)) + , Bucket(std::move(bucket)) + , Info(std::move(info)) + , Key(std::move(key)) + , ValueChain(std::move(valueChain)) + {} + + void Bootstrap() { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDTS00, "TUploaderActor::Bootstrap", (Key, Key), (ValueChain, ValueChain)); + size_t targetOffset = 0; + EnumerateBlobsForValueChain(ValueChain, Info->TabletID, TOverloaded{ + [&](TLogoBlobID id, ui32 shift, ui32 size) { + const ui32 groupId = Info->GroupFor(id.Channel(), id.Generation()); + SendToBSProxy(SelfId(), groupId, new TEvBlobStorage::TEvGet(id, shift, size, TInstant::Max(), + NKikimrBlobStorage::EGetHandleClass::FastRead), targetOffset); + ++RepliesPending; + targetOffset += size; + }, + [&](TS3Locator) { + // already stored in S3, nothing to do + } + }); + Buffer = TString::Uninitialized(targetOffset); + if (!RepliesPending) { + // don't have to read anything? + } + Become(&TThis::StateFunc); + } + + void Handle(TEvBlobStorage::TEvGetResult::TPtr ev) { + auto *msg = ev->Get(); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDTS01, "TUploaderActor::Handle(TEvGetResult)", (Msg, *msg)); + if (msg->Status == NKikimrProto::OK && msg->ResponseSz == 1 && msg->Responses->Status == NKikimrProto::OK) { + TRope& rope = msg->Responses->Buffer; + char *ptr = Buffer.Detach() + ev->Cookie; + for (auto it = rope.Begin(); it.Valid(); it.AdvanceToNextContiguousBlock()) { + memcpy(ptr, it.ContiguousData(), it.ContiguousSize()); + ptr += it.ContiguousSize(); + } + if (!--RepliesPending) { + auto request = Aws::S3::Model::PutObjectRequest() + .WithBucket(Bucket) + .WithKey(TStringBuilder() << BasePath << '/' << Key.MakeTextualKey()); + Send(WrapperId, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer))); + } + } else { + } + } + + void Handle(TEvExternalStorage::TEvPutObjectResponse::TPtr ev) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDTS02, "TUploaderActor::Handle(TEvPutObjectResponse)", (Result, ev->Get()->Result)); + if (auto& result = ev->Get()->Result; result.IsSuccess()) { + } else { + } + PassAway(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvBlobStorage::TEvGetResult, Handle) + hFunc(TEvExternalStorage::TEvPutObjectResponse, Handle) + cFunc(TEvents::TSystem::Poison, PassAway) + ) + }; + + void TS3Manager::OnKeyWritten(const TData::TKey& key, const TValueChain& valueChain) { + if (AsyncMode) { + ActiveUploaders.insert(Self->Register(new TUploaderActor(WrapperId, BasePath, Bucket, Self->Info(), key, + valueChain))); + } + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/s3_write.cpp b/ydb/core/blob_depot/s3_write.cpp new file mode 100644 index 00000000000..0914aa21e7c --- /dev/null +++ b/ydb/core/blob_depot/s3_write.cpp @@ -0,0 +1,118 @@ +#include "s3.h" +#include "blocks.h" + +namespace NKikimr::NBlobDepot { + + using TS3Manager = TBlobDepot::TS3Manager; + + class TS3Manager::TTxPrepareWriteS3 : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + const ui32 NodeId; + const ui64 AgentInstanceId; + std::unique_ptr<TEvBlobDepot::TEvPrepareWriteS3::THandle> Request; + std::unique_ptr<IEventHandle> Response; + + public: + TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_PREPARE_WRITE_S3; } + + TTxPrepareWriteS3(TBlobDepot *self, TAgent& agent, std::unique_ptr<TEvBlobDepot::TEvPrepareWriteS3::THandle> request) + : TTransactionBase(self) + , NodeId(agent.Connection->NodeId) + , AgentInstanceId(*agent.AgentInstanceId) + , Request(std::move(request)) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + TAgent& agent = Self->GetAgent(NodeId); + if (!agent.Connection || agent.AgentInstanceId != AgentInstanceId) { + // agent has been disconnected while transaction was in queue -- do nothing + return true; + } + + if (!Self->Data->LoadMissingKeys(Request->Get()->Record, txc)) { + // we haven't loaded all of the required keys + return false; + } + + NIceDb::TNiceDb db(txc.DB); + + NKikimrBlobDepot::TEvPrepareWriteS3Result *responseRecord; + std::tie(Response, responseRecord) = TEvBlobDepot::MakeResponseFor(*Request); + + for (const auto& record = Request->Get()->Record; const auto& item : record.GetItems()) { + auto *responseItem = responseRecord->AddItems(); + + TString error; + if (NKikimrProto::EReplyStatus status = CheckItem(item, error); status != NKikimrProto::OK) { + // basic checks have failed (blocked, item was deleted, or something else) + responseItem->SetStatus(status); + responseItem->SetErrorReason(error); + } else { + responseItem->SetStatus(NKikimrProto::OK); + + const TS3Locator locator = Self->S3Manager->AllocateS3Locator(item.GetLen()); + locator.ToProto(responseItem->MutableS3Locator()); + + // we put it here until operation is completed; if tablet restarts and operation fails, then this + // key will be deleted + db.Table<Schema::TrashS3>().Key(locator.Generation, locator.KeyId).Update<Schema::TrashS3::Len>(locator.Len); + + const bool inserted = agent.S3WritesInFlight.insert(locator).second; + Y_ABORT_UNLESS(inserted); + } + } + + return true; + } + + NKikimrProto::EReplyStatus CheckItem(const NKikimrBlobDepot::TEvPrepareWriteS3::TItem& item, TString& error) { + auto key = TData::TKey::FromBinaryKey(item.GetKey(), Self->Config); + + bool blocksPass = std::visit(TOverloaded{ + [](TStringBuf) { return true; }, + [&](TLogoBlobID blobId) { return Self->BlocksManager->CheckBlock(blobId.TabletID(), blobId.Generation()); } + }, key.AsVariant()); + + for (const auto& extra : item.GetExtraBlockChecks()) { + if (!blocksPass) { + break; + } + blocksPass = Self->BlocksManager->CheckBlock(extra.GetTabletId(), extra.GetGeneration()); + } + + if (!blocksPass) { + error = "blocked"; + return NKikimrProto::BLOCKED; + } + + if (auto e = Self->Data->CheckKeyAgainstBarrier(key)) { + error = TStringBuilder() << "BlobId# " << key.ToString() << " is being put beyond the barrier: " << *e; + return NKikimrProto::ERROR; + } + + return NKikimrProto::OK; + } + + void Complete(const TActorContext&) override { + TActivationContext::Send(Response.release()); + } + }; + + TS3Locator TS3Manager::AllocateS3Locator(ui32 len) { + return { + .Len = len, + .Generation = Self->Executor()->Generation(), + .KeyId = NextKeyId++, + }; + } + + void TBlobDepot::Handle(TEvBlobDepot::TEvPrepareWriteS3::TPtr ev) { + auto& agent = GetAgent(ev->Recipient); + + STLOG(PRI_DEBUG, BLOB_DEPOT, BDTS07, "TEvPrepareWriteS3", (Id, GetLogId()), (AgentId, agent.Connection->NodeId), + (Msg, ev->Get()->Record)); + + Execute(std::make_unique<TS3Manager::TTxPrepareWriteS3>(this, agent, + std::unique_ptr<TEvBlobDepot::TEvPrepareWriteS3::THandle>(ev.Release()))); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/schema.h b/ydb/core/blob_depot/schema.h index c19fe1a636e..2e655bc1d73 100644 --- a/ydb/core/blob_depot/schema.h +++ b/ydb/core/blob_depot/schema.h @@ -106,13 +106,25 @@ namespace NKikimr::NBlobDepot { using TColumns = TableColumns<Channel, GroupId, IssuedGenStep, ConfirmedGenStep>; }; + struct TrashS3 : Table<7> { + struct Generation : Column<1, NScheme::NTypeIds::Uint32> {}; + struct KeyId : Column<2, NScheme::NTypeIds::Uint64> {}; + struct Len : Column<3, NScheme::NTypeIds::Uint32> {}; + + using TKey = TableKey<Generation, KeyId>; + using TColumns = TableColumns<Generation, KeyId, Len>; + + using Precharge = NoAutoPrecharge; + }; + using TTables = SchemaTables< Config, Blocks, Barriers, Data, Trash, - GC + GC, + TrashS3 >; using TSettings = SchemaSettings< diff --git a/ydb/core/blob_depot/testing.cpp b/ydb/core/blob_depot/testing.cpp index e94b873abb5..79758a7b7a2 100644 --- a/ydb/core/blob_depot/testing.cpp +++ b/ydb/core/blob_depot/testing.cpp @@ -41,13 +41,17 @@ namespace NKikimr::NBlobDepot { const TData::TValue *value = Data->FindKey(key); Y_ABORT_UNLESS(value); // key must exist ui32 numDataBytes = 0; - EnumerateBlobsForValueChain(value->ValueChain, TabletID(), [&](TLogoBlobID id, ui32, ui32 size) { - const ui32 groupId = Info()->GroupFor(id.Channel(), id.Generation()); - const auto state = overseer.GetBlobState(groupId, id); - Y_VERIFY_S(state == NTesting::EBlobState::CERTAINLY_WRITTEN, - "UserId# " << userId.ToString() << " UserState# " << (int)userState - << " Id# " << id.ToString() << " State# " << (int)state); - numDataBytes += size; + EnumerateBlobsForValueChain(value->ValueChain, TabletID(), TOverloaded { + [&](TLogoBlobID id, ui32, ui32 size) { + const ui32 groupId = Info()->GroupFor(id.Channel(), id.Generation()); + const auto state = overseer.GetBlobState(groupId, id); + Y_VERIFY_S(state == NTesting::EBlobState::CERTAINLY_WRITTEN, + "UserId# " << userId.ToString() << " UserState# " << (int)userState + << " Id# " << id.ToString() << " State# " << (int)state); + numDataBytes += size; + }, + [&](TS3Locator) { + } }); Y_ABORT_UNLESS(numDataBytes == userId.BlobSize()); break; diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h index bbad5240829..ee53ec166e8 100644 --- a/ydb/core/blob_depot/types.h +++ b/ydb/core/blob_depot/types.h @@ -123,6 +123,82 @@ namespace NKikimr::NBlobDepot { } }; + struct TS3Locator { + ui32 Len = 0; + ui32 Generation = 0; + ui64 KeyId = 0; + + static TS3Locator FromProto(const NKikimrBlobDepot::TS3Locator& locator) { + return { + .Len = locator.GetLen(), + .Generation = locator.GetGeneration(), + .KeyId = locator.GetKeyId(), + }; + } + + void ToProto(NKikimrBlobDepot::TS3Locator *locator) const { + locator->SetLen(Len); + locator->SetGeneration(Generation); + locator->SetKeyId(KeyId); + } + + void Output(IOutputStream& s) const { + s << '{' << Len << '@' << Generation << '.' << KeyId << '}'; + } + + TString ToString() const { + TStringStream s; + Output(s); + return s.Str(); + } + + TString MakeObjectName(const TString& basePath) const { + const size_t hash = THash()(*this); + const size_t a = hash % 36; + const size_t b = hash / 36 % 36; + static const char vec[] = "0123456789abcdefghijklmnopqrstuvwxyz"; + return TStringBuilder() << basePath + << '/' << Generation + << '/' << vec[a] + << '/' << vec[b] + << '/' << KeyId; + } + + static std::optional<TS3Locator> FromObjectName(const TString& name, ui64 len, TString *error) { + try { + if (len > Max<ui32>()) { + *error = "value is too long"; + return std::nullopt; + } + ui32 generation; + ui64 keyId; + TString a, b; + Split(name, '/', generation, a, b, keyId); + TS3Locator res{ + .Len = static_cast<ui32>(len), + .Generation = generation, + .KeyId = keyId, + }; + if (res.MakeObjectName(TString()) != '/' + name) { + *error = "object name does not match"; + return std::nullopt; + } + return res; + } catch (const std::exception& ex) { + *error = ex.what(); + return std::nullopt; + } + } + + struct THash { + size_t operator ()(const TS3Locator& x) const { + return MultiHash(x.Len, x.Generation, x.KeyId); + } + }; + + friend std::strong_ordering operator <=>(const TS3Locator&, const TS3Locator&) = default; + }; + class TGivenIdRange { static constexpr size_t BitsPerChunk = 256; using TChunk = TBitMap<BitsPerChunk, ui64>; @@ -165,16 +241,21 @@ namespace NKikimr::NBlobDepot { template<typename TCallback> void EnumerateBlobsForValueChain(const TValueChain& valueChain, ui64 tabletId, TCallback&& callback) { for (const auto& item : valueChain) { - const auto& locator = item.GetLocator(); - const auto& blobSeqId = TBlobSeqId::FromProto(locator.GetBlobSeqId()); - if (locator.GetFooterLen() == 0) { - callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen()), 0, locator.GetTotalDataLen()); - } else if (locator.GetTotalDataLen() + locator.GetFooterLen() > MaxBlobSize) { - callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen()), 0, locator.GetTotalDataLen()); - callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_FOOTER_BLOB, 0, locator.GetFooterLen()), 0, 0); - } else { - callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_COMPOSITE_BLOB, 0, locator.GetTotalDataLen() + - locator.GetFooterLen()), 0, locator.GetTotalDataLen()); + if (item.HasBlobLocator()) { + const auto& locator = item.GetBlobLocator(); + const auto& blobSeqId = TBlobSeqId::FromProto(locator.GetBlobSeqId()); + if (locator.GetFooterLen() == 0) { + callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen()), 0, locator.GetTotalDataLen()); + } else if (locator.GetTotalDataLen() + locator.GetFooterLen() > MaxBlobSize) { + callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen()), 0, locator.GetTotalDataLen()); + callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_FOOTER_BLOB, 0, locator.GetFooterLen()), 0, 0); + } else { + callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_COMPOSITE_BLOB, 0, locator.GetTotalDataLen() + + locator.GetFooterLen()), 0, locator.GetTotalDataLen()); + } + } + if (item.HasS3Locator()) { + callback(TS3Locator::FromProto(item.GetS3Locator())); } } } @@ -216,3 +297,6 @@ namespace NKikimr::NBlobDepot { } while (false) } // NKikimr::NBlobDepot + +template<> struct THash<NKikimr::NBlobDepot::TS3Locator> : NKikimr::NBlobDepot::TS3Locator::THash {}; +template<> struct std::hash<NKikimr::NBlobDepot::TS3Locator> : THash<NKikimr::NBlobDepot::TS3Locator> {}; diff --git a/ydb/core/blob_depot/ya.make b/ydb/core/blob_depot/ya.make index 34642663934..42b8adf4d67 100644 --- a/ydb/core/blob_depot/ya.make +++ b/ydb/core/blob_depot/ya.make @@ -33,6 +33,12 @@ LIBRARY() group_metrics_exchange.cpp mon_main.cpp mon_main.h + s3.cpp + s3_delete.cpp + s3_scan.cpp + s3_upload.cpp + s3_write.cpp + s3.h space_monitor.cpp space_monitor.h testing.cpp @@ -49,6 +55,7 @@ LIBRARY() ydb/core/blobstorage/vdisk/common ydb/core/tablet_flat ydb/core/protos + ydb/core/wrappers ) GENERATE_ENUM_SERIALIZATION(schema.h) diff --git a/ydb/core/mind/bscontroller/virtual_group.cpp b/ydb/core/mind/bscontroller/virtual_group.cpp index c49349750e5..c6a6adc2683 100644 --- a/ydb/core/mind/bscontroller/virtual_group.cpp +++ b/ydb/core/mind/bscontroller/virtual_group.cpp @@ -95,6 +95,10 @@ namespace NKikimr::NBsController { NKikimrBlobDepot::TBlobDepotConfig config; config.SetVirtualGroupId(group->ID.GetRawId()); config.MutableChannelProfiles()->CopyFrom(cmd.GetChannelProfiles()); + if (cmd.HasS3BackendSettings()) { + config.MutableS3BackendSettings()->CopyFrom(cmd.GetS3BackendSettings()); + } + config.SetName(cmd.GetName()); const bool success = config.SerializeToString(&group->BlobDepotConfig.ConstructInPlace()); Y_ABORT_UNLESS(success); diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index c7fe27a8ad0..8f9c12cb033 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -25,10 +25,17 @@ message TBlobLocator { optional uint32 FooterLen = 5; } +message TS3Locator { + optional uint32 Len = 1; // length of contained data + optional uint32 Generation = 2; // generation of tablet issued this object into S3 + optional uint64 KeyId = 3; // key id unique within the generation +} + message TValueChain { - optional TBlobLocator Locator = 1; + optional TBlobLocator BlobLocator = 1; optional uint64 SubrangeBegin = 2; optional uint64 SubrangeEnd = 3; + optional TS3Locator S3Locator = 4; // if filled, then this blob is stored in S3 } message TValue { @@ -54,6 +61,7 @@ message TResolvedValueChain { optional NKikimrProto.TLogoBlobID BlobId = 2; optional uint32 SubrangeBegin = 3; optional uint32 SubrangeEnd = 4; + optional TS3Locator S3Locator = 5; } @@ -87,6 +95,8 @@ message TEvRegisterAgentResult { optional uint32 DecommitGroupId = 3; optional NKikimrBlobStorage.TPDiskSpaceColor.E SpaceColor = 4; optional float ApproximateFreeSpaceShare = 5; + optional NKikimrBlobDepot.TS3BackendSettings S3BackendSettings = 6; + optional string Name = 7; } message TEvAllocateIds { @@ -161,12 +171,19 @@ message TEvCollectGarbageResult { } message TEvCommitBlobSeq { + message TExtraBlockCheck { + optional uint64 TabletId = 1; + optional uint32 Generation = 2; + } + message TItem { optional TBlobLocator BlobLocator = 1; // GroupId and Generation are for validation purposes optional bytes Key = 2; optional bytes Meta = 3; optional bool UncertainWrite = 4; optional bool CommitNotify = 5; + optional TS3Locator S3Locator = 6; // if blob has been written to S3 + repeated TExtraBlockCheck ExtraBlockChecks = 7; } repeated TItem Items = 1; @@ -183,6 +200,7 @@ message TEvCommitBlobSeqResult { message TEvDiscardSpoiledBlobSeq { repeated TBlobSeqId Items = 1; + repeated TS3Locator S3Locators = 2; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -233,3 +251,28 @@ message TEvPushMetrics { optional uint64 BytesRead = 1; // since last update optional uint64 BytesWritten = 2; // since last update } + +message TEvPrepareWriteS3 { + message TExtraBlockCheck { + optional uint64 TabletId = 1; + optional uint32 Generation = 2; + } + + message TItem { + optional bytes Key = 1; // the key user is going to write + repeated TExtraBlockCheck ExtraBlockChecks = 2; + optional uint32 Len = 3; // length of the value + } + + repeated TItem Items = 1; +} + +message TEvPrepareWriteS3Result { + message TItem { + optional NKikimrProto.EReplyStatus Status = 1; + optional string ErrorReason = 2; + optional TS3Locator S3Locator = 3; + } + + repeated TItem Items = 1; +} diff --git a/ydb/core/protos/blob_depot_config.proto b/ydb/core/protos/blob_depot_config.proto index 0d63aec67a6..ab0ae975448 100644 --- a/ydb/core/protos/blob_depot_config.proto +++ b/ydb/core/protos/blob_depot_config.proto @@ -21,23 +21,34 @@ message TChannelProfile { message TS3BackendSettings { optional NKikimrSchemeOp.TS3Settings Settings = 1; // how to connect to S3 (mandatory field) - // data (when written) is first stored in ordinary data channel and then asynchronously transferred to S3 storage by - // BlobDepot tablet; amount of stored data can be controlled by following settings - // - // future operation may include modes when data is uploaded by agents directly to the storage + message TAsyncMode { + // data (when written) is first stored in ordinary data channel and then asynchronously transferred to S3 storage by + // BlobDepot tablet; amount of stored data can be controlled by following settings + // + // future operation may include modes when data is uploaded by agents directly to the storage - // when do we fully stop processing new queries until some data is uploaded - optional uint64 MaxPendingBytes = 2; + // when do we fully stop processing new queries until some data is uploaded + optional uint64 MaxPendingBytes = 2; - // when do we start throttling (if set, this must be strictly less than MaxPendingBytes, which also must be set) - optional uint64 ThrottleStartBytes = 3; + // when do we start throttling (if set, this must be strictly less than MaxPendingBytes, which also must be set) + optional uint64 ThrottleStartBytes = 3; - // upload rate (bytes per second) at ThrottleStartBytes amount of data (which linearly drops to 0 as amount increases - // up to MaxPendingBytes) - optional uint64 ThrottleMaxBytesPerSecond = 4; + // upload rate (bytes per second) at ThrottleStartBytes amount of data (which linearly drops to 0 as amount increases + // up to MaxPendingBytes) + optional uint64 ThrottleMaxBytesPerSecond = 4; - // number of concurrent puts - optional uint32 AsyncUploadPutsInFlight = 5 [default = 1]; + // number of concurrent puts + optional uint32 UploadPutsInFlight = 5 [default = 1]; + } + + message TSyncMode { + // data is written synchronously by agent + } + + oneof Mode { + TAsyncMode AsyncMode = 2; + TSyncMode SyncMode = 3; + } } message TBlobDepotConfig { @@ -49,4 +60,5 @@ message TBlobDepotConfig { optional NKikimrHive.TEvCreateTablet HiveParams = 6; // extra hive parameters optional uint64 TenantHiveId = 7; optional TS3BackendSettings S3BackendSettings = 8; // if set, then this tablet stores all its data in S3 (except for some cache/inflight) + optional string Name = 9; } diff --git a/ydb/core/protos/blobstorage_config.proto b/ydb/core/protos/blobstorage_config.proto index f1f3e71fab1..af6a8fa96d7 100644 --- a/ydb/core/protos/blobstorage_config.proto +++ b/ydb/core/protos/blobstorage_config.proto @@ -495,6 +495,7 @@ message TAllocateVirtualGroup { } repeated NKikimrBlobDepot.TChannelProfile ChannelProfiles = 5; uint64 BlobDepotId = 6; // when the tablet is already created; for testing purposes only + optional NKikimrBlobDepot.TS3BackendSettings S3BackendSettings = 8; // if S3 is used as a backend for this instance } message TDecommitGroups { diff --git a/ydb/core/protos/counters_blob_depot.proto b/ydb/core/protos/counters_blob_depot.proto index 9b32efdbf94..81d0c6cc4e4 100644 --- a/ydb/core/protos/counters_blob_depot.proto +++ b/ydb/core/protos/counters_blob_depot.proto @@ -9,6 +9,10 @@ enum ESimpleCounters { COUNTER_TOTAL_STORED_TRASH_SIZE = 1 [(NKikimr.CounterOpts) = {Name: "TotalStoredTrashSize"}]; COUNTER_IN_FLIGHT_TRASH_SIZE = 2 [(NKikimr.CounterOpts) = {Name: "InFlightTrashSize"}]; COUNTER_BYTES_TO_DECOMMIT = 3 [(NKikimr.CounterOpts) = {Name: "BytesToDecommit"}]; + COUNTER_TOTAL_S3_DATA_OBJECTS = 4 [(NKikimr.CounterOpts) = {Name: "TotalS3DataObjects"}]; + COUNTER_TOTAL_S3_DATA_SIZE = 5 [(NKikimr.CounterOpts) = {Name: "TotalS3DataSize"}]; + COUNTER_TOTAL_S3_TRASH_OBJECTS = 6 [(NKikimr.CounterOpts) = {Name: "TotalS3TrashObjects"}]; + COUNTER_TOTAL_S3_TRASH_SIZE = 7 [(NKikimr.CounterOpts) = {Name: "TotalS3TrashSize"}]; } enum ECumulativeCounters { @@ -17,6 +21,12 @@ enum ECumulativeCounters { COUNTER_PUTS_ERROR = 2 [(NKikimr.CounterOpts) = {Name: "Puts/Error"}]; COUNTER_DECOMMIT_GET_BYTES = 3 [(NKikimr.CounterOpts) = {Name: "Decommit/GetBytes"}]; COUNTER_DECOMMIT_PUT_OK_BYTES = 4 [(NKikimr.CounterOpts) = {Name: "Decommit/PutOkBytes"}]; + COUNTER_S3_PUTS_OK = 5 [(NKikimr.CounterOpts) = {Name: "S3/Puts/Ok"}]; + COUNTER_S3_PUTS_ERROR = 6 [(NKikimr.CounterOpts) = {Name: "S3/Puts/Error"}]; + COUNTER_S3_PUTS_BYTES = 7 [(NKikimr.CounterOpts) = {Name: "S3/Puts/OkBytes"}]; + COUNTER_S3_DELETES_OK = 8 [(NKikimr.CounterOpts) = {Name: "S3/Deletes/Ok"}]; + COUNTER_S3_DELETES_ERROR = 9 [(NKikimr.CounterOpts) = {Name: "S3/Deletes/Error"}]; + COUNTER_S3_DELETES_BYTES = 10 [(NKikimr.CounterOpts) = {Name: "S3/Deletes/OkBytes"}]; } enum EPercentileCounters { @@ -61,4 +71,7 @@ enum ETxTypes { TXTYPE_COMMIT_BLOB_SEQ = 14 [(NKikimr.TxTypeOpts) = {Name: "TTxCommitBlobSeq"}]; TXTYPE_UPDATE_BLOCK = 15 [(NKikimr.TxTypeOpts) = {Name: "TTxUpdateBlock"}]; TXTYPE_HARD_GC = 16 [(NKikimr.TxTypeOpts) = {Name: "TTxHardGC"}]; + TXTYPE_PREPARE_WRITE_S3 = 17 [(NKikimr.TxTypeOpts) = {Name: "TTxPrepareWriteS3"}]; + TXTYPE_DELETE_TRASH_S3 = 18 [(NKikimr.TxTypeOpts) = {Name: "TTxDeleteTrashS3"}]; + TXTYPE_PROCESS_SCANNED_KEYS = 19 [(NKikimr.TxTypeOpts) = {Name: "TTxProcessScannedKeys"}]; } diff --git a/ydb/core/util/stlog.h b/ydb/core/util/stlog.h index d35fefc4e9c..5cce2fd8420 100644 --- a/ydb/core/util/stlog.h +++ b/ydb/core/util/stlog.h @@ -104,6 +104,12 @@ namespace NKikimr::NStLog { template<typename T> struct TIsIterable<NProtoBuf::RepeatedField<T>> { static constexpr bool value = true; }; template<typename T> struct TIsIterable<NProtoBuf::RepeatedPtrField<T>> { static constexpr bool value = true; }; + template<typename T> struct TIsIterableKV { static constexpr bool value = false; }; + template<typename... Ts> struct TIsIterableKV<THashMap<Ts...>> { static constexpr bool value = true; }; + template<typename... Ts> struct TIsIterableKV<TMap<Ts...>> { static constexpr bool value = true; }; + template<typename... Ts> struct TIsIterableKV<std::map<Ts...>> { static constexpr bool value = true; }; + template<typename... Ts> struct TIsIterableKV<std::unordered_map<Ts...>> { static constexpr bool value = true; }; + template<typename T> struct TIsIdWrapper { static constexpr bool value = false; }; template<typename TType, typename TTag> struct TIsIdWrapper<TIdWrapper<TType, TTag>> { static constexpr bool value = true; }; @@ -180,6 +186,19 @@ namespace NKikimr::NStLog { OutputParam(s, *begin); } s << "]"; + } else if constexpr (TIsIterableKV<Tx>::value) { + s << '{'; + for (bool first = true; const auto& [k, v] : value) { + if (first) { + first = false; + } else { + s << ' '; + } + OutputParam(s, k); + s << ':'; + OutputParam(s, v); + } + s << '}'; } else { s << value; } @@ -212,6 +231,17 @@ namespace NKikimr::NStLog { OutputParam(json, *begin); } json.CloseArray(); + } else if constexpr (TIsIterableKV<Tx>::value) { + json.OpenArray(); + for (const auto& [k, v] : value) { + json.OpenMap(); + json.WriteKey("key"); + OutputParam(json, k); + json.WriteKey("value"); + OutputParam(json, v); + json.CloseMap(); + } + json.CloseArray(); } else if constexpr (TIsIdWrapper<Tx>::value){ json.Write(value.GetRawId()); } else if constexpr (std::is_constructible_v<NJson::TJsonValue, Tx>) { |