aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@ydb.tech>2025-03-05 14:50:19 +0300
committerGitHub <noreply@github.com>2025-03-05 11:50:19 +0000
commit18c06581a73311b11d3008b1bccd606147e19da0 (patch)
tree441eb544a8086acf74cd79680c8f00082ae3d587
parent9908e6c91516d60ca5b96ef2f14722ca93366763 (diff)
downloadydb-18c06581a73311b11d3008b1bccd606147e19da0.tar.gz
Add S3 support into BlobDepot (#14979)
-rw-r--r--ydb/apps/dstool/lib/dstool_cmd_group_virtual_create.py12
-rw-r--r--ydb/core/blob_depot/agent.cpp15
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h14
-rw-r--r--ydb/core/blob_depot/agent/comm.cpp21
-rw-r--r--ydb/core/blob_depot/agent/read.cpp117
-rw-r--r--ydb/core/blob_depot/agent/request.cpp1
-rw-r--r--ydb/core/blob_depot/agent/resolved_value.cpp32
-rw-r--r--ydb/core/blob_depot/agent/resolved_value.h9
-rw-r--r--ydb/core/blob_depot/agent/storage_put.cpp172
-rw-r--r--ydb/core/blob_depot/agent/ya.make1
-rw-r--r--ydb/core/blob_depot/blob_depot.cpp12
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h16
-rw-r--r--ydb/core/blob_depot/data.cpp243
-rw-r--r--ydb/core/blob_depot/data.h60
-rw-r--r--ydb/core/blob_depot/data_gc.cpp20
-rw-r--r--ydb/core/blob_depot/data_load.cpp53
-rw-r--r--ydb/core/blob_depot/data_mon.cpp7
-rw-r--r--ydb/core/blob_depot/data_resolve.cpp26
-rw-r--r--ydb/core/blob_depot/data_trash.cpp15
-rw-r--r--ydb/core/blob_depot/data_uncertain.cpp8
-rw-r--r--ydb/core/blob_depot/events.h6
-rw-r--r--ydb/core/blob_depot/mon_main.cpp51
-rw-r--r--ydb/core/blob_depot/op_commit_blob_seq.cpp177
-rw-r--r--ydb/core/blob_depot/s3.cpp53
-rw-r--r--ydb/core/blob_depot/s3.h84
-rw-r--r--ydb/core/blob_depot/s3_delete.cpp225
-rw-r--r--ydb/core/blob_depot/s3_scan.cpp216
-rw-r--r--ydb/core/blob_depot/s3_upload.cpp95
-rw-r--r--ydb/core/blob_depot/s3_write.cpp118
-rw-r--r--ydb/core/blob_depot/schema.h14
-rw-r--r--ydb/core/blob_depot/testing.cpp18
-rw-r--r--ydb/core/blob_depot/types.h104
-rw-r--r--ydb/core/blob_depot/ya.make7
-rw-r--r--ydb/core/mind/bscontroller/virtual_group.cpp4
-rw-r--r--ydb/core/protos/blob_depot.proto45
-rw-r--r--ydb/core/protos/blob_depot_config.proto38
-rw-r--r--ydb/core/protos/blobstorage_config.proto1
-rw-r--r--ydb/core/protos/counters_blob_depot.proto13
-rw-r--r--ydb/core/util/stlog.h30
39 files changed, 1884 insertions, 269 deletions
diff --git a/ydb/apps/dstool/lib/dstool_cmd_group_virtual_create.py b/ydb/apps/dstool/lib/dstool_cmd_group_virtual_create.py
index 061e37b58c1..54bc3aca8ac 100644
--- a/ydb/apps/dstool/lib/dstool_cmd_group_virtual_create.py
+++ b/ydb/apps/dstool/lib/dstool_cmd_group_virtual_create.py
@@ -2,6 +2,8 @@ import ydb.apps.dstool.lib.common as common
import ydb.core.protos.blob_depot_config_pb2 as blob_depot_config
import sys
import time
+from argparse import FileType
+import google.protobuf.json_format as pb_json
description = 'Create virtual group backed by BlobDepot'
@@ -17,11 +19,18 @@ def add_options(p):
p.add_argument('--log-channel-sp', type=str, metavar='POOL_NAME', required=True, help='channel 0 specifier')
p.add_argument('--snapshot-channel-sp', type=str, metavar='POOL_NAME', help='channel 1 specifier (defaults to channel 0)')
p.add_argument('--data-channel-sp', type=str, metavar='POOL_NAME[*COUNT]', nargs='+', required=True, help='data channel specifier')
+ p.add_argument('--s3-settings', type=FileType('r', encoding='utf-8'), metavar='JSON_FILE', help='path to JSON file containing S3 settings')
p.add_argument('--wait', action='store_true', help='wait for operation to complete by polling')
def do(args):
request = common.create_bsc_request(args)
+
+ if args.s3_settings:
+ s3_settings = args.s3_settings.read()
+ else:
+ s3_settings = None
+
for name in args.name:
cmd = request.Command.add().AllocateVirtualGroup
@@ -41,6 +50,9 @@ def do(args):
print(f'Invalid --storage-pool-id={args.storage_pool_id} format, <number>:<number> expected', file=sys.stderr)
sys.exit(1)
+ if s3_settings is not None:
+ pb_json.Parse(s3_settings, cmd.S3BackendSettings)
+
cmd.ChannelProfiles.add(StoragePoolName=args.log_channel_sp, ChannelKind=blob_depot_config.TChannelKind.System)
chan1 = args.snapshot_channel_sp if args.snapshot_channel_sp is not None else args.log_channel_sp
cmd.ChannelProfiles.add(StoragePoolName=chan1, ChannelKind=blob_depot_config.TChannelKind.System)
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp
index 552e53213da..b925d66fcdf 100644
--- a/ydb/core/blob_depot/agent.cpp
+++ b/ydb/core/blob_depot/agent.cpp
@@ -1,6 +1,7 @@
#include "blob_depot_tablet.h"
#include "data.h"
#include "space_monitor.h"
+#include "s3.h"
namespace NKikimr::NBlobDepot {
@@ -30,6 +31,12 @@ namespace NKikimr::NBlobDepot {
void TBlobDepot::OnAgentDisconnect(TAgent& agent) {
agent.InvalidateStepRequests.clear();
agent.PushCallbacks.clear();
+
+ for (TS3Locator locator : agent.S3WritesInFlight) {
+ // they were not in InFlightTrashS3, so we just have to delete them
+ S3Manager->AddTrashToCollect(locator);
+ }
+ agent.S3WritesInFlight.clear();
}
void TBlobDepot::Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev) {
@@ -79,6 +86,14 @@ namespace NKikimr::NBlobDepot {
record->SetDecommitGroupId(Config.GetVirtualGroupId());
}
+ if (Config.HasS3BackendSettings()) {
+ record->MutableS3BackendSettings()->CopyFrom(Config.GetS3BackendSettings());
+ }
+
+ if (Config.HasName()) {
+ record->SetName(Config.GetName());
+ }
+
TActivationContext::Send(response.release());
if (!agent.InvalidatedStepInFlight.empty()) {
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index e4edfdaa237..6adf340e63a 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -5,6 +5,8 @@
#include <ydb/core/protos/blob_depot_config.pb.h>
+#include <ydb/core/wrappers/abstract.h>
+
namespace NKikimr::NBlobDepot {
#define ENUMERATE_INCOMING_EVENTS(XX) \
@@ -129,6 +131,7 @@ namespace NKikimr::NBlobDepot {
TEvBlobDepot::TEvCollectGarbageResult*,
TEvBlobDepot::TEvCommitBlobSeqResult*,
TEvBlobDepot::TEvResolveResult*,
+ TEvBlobDepot::TEvPrepareWriteS3Result*,
// underlying DS proxy responses
TEvBlobStorage::TEvGetResult*,
@@ -233,6 +236,7 @@ namespace NKikimr::NBlobDepot {
hFunc(TEvBlobDepot::TEvCollectGarbageResult, HandleTabletResponse);
hFunc(TEvBlobDepot::TEvCommitBlobSeqResult, HandleTabletResponse);
hFunc(TEvBlobDepot::TEvResolveResult, HandleTabletResponse);
+ hFunc(TEvBlobDepot::TEvPrepareWriteS3Result, HandleTabletResponse);
hFunc(TEvBlobStorage::TEvGetResult, HandleOtherResponse);
hFunc(TEvBlobStorage::TEvPutResult, HandleOtherResponse);
@@ -255,6 +259,9 @@ namespace NKikimr::NBlobDepot {
void PassAway() override {
ClearPendingEventQueue("BlobDepot agent destroyed");
NTabletPipe::CloseAndForgetClient(SelfId(), PipeId);
+ if (S3WrapperId) {
+ TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, S3WrapperId, SelfId(), nullptr, 0));
+ }
TActor::PassAway();
}
@@ -318,6 +325,11 @@ namespace NKikimr::NBlobDepot {
NKikimrBlobStorage::TPDiskSpaceColor::E SpaceColor = {};
float ApproximateFreeSpaceShare = 0.0f;
+ NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig;
+ std::optional<NKikimrBlobDepot::TS3BackendSettings> S3BackendSettings;
+ TActorId S3WrapperId;
+ TString S3BasePath;
+
void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev);
void ConnectToBlobDepot();
@@ -447,6 +459,8 @@ namespace NKikimr::NBlobDepot {
void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev);
void HandleQueryWatchdog();
+ void Invoke(std::function<void()> callback) { callback(); }
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
struct TChannelKind
diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp
index 1b92ee886ad..82abae69791 100644
--- a/ydb/core/blob_depot/agent/comm.cpp
+++ b/ydb/core/blob_depot/agent/comm.cpp
@@ -1,6 +1,8 @@
#include "agent_impl.h"
#include "blocks.h"
+#include <ydb/core/wrappers/s3_wrapper.h>
+
namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) {
@@ -86,6 +88,24 @@ namespace NKikimr::NBlobDepot {
SpaceColor = msg.GetSpaceColor();
ApproximateFreeSpaceShare = msg.GetApproximateFreeSpaceShare();
+ S3BackendSettings = msg.HasS3BackendSettings()
+ ? std::make_optional(msg.GetS3BackendSettings())
+ : std::nullopt;
+
+ if (S3WrapperId) {
+ TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, S3WrapperId, SelfId(), nullptr, 0));
+ S3WrapperId = {};
+ }
+
+ if (S3BackendSettings) {
+ auto& settings = S3BackendSettings->GetSettings();
+ ExternalStorageConfig = NWrappers::IExternalStorageConfig::Construct(settings);
+ S3WrapperId = Register(NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
+ S3BasePath = TStringBuilder() << settings.GetObjectKeyPattern() << '/' << msg.GetName();
+ } else {
+ ExternalStorageConfig = {};
+ }
+
OnConnect();
}
@@ -171,6 +191,7 @@ namespace NKikimr::NBlobDepot {
template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestContext::TPtr context);
template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvCommitBlobSeq msg, TRequestSender *sender, TRequestContext::TPtr context);
template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvDiscardSpoiledBlobSeq msg, TRequestSender *sender, TRequestContext::TPtr context);
+ template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvPrepareWriteS3 msg, TRequestSender *sender, TRequestContext::TPtr context);
ui64 TBlobDepotAgent::Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestContext::TPtr context) {
const ui64 id = NextTabletRequestId++;
diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp
index 9b1e5519955..653ef5f5252 100644
--- a/ydb/core/blob_depot/agent/read.cpp
+++ b/ydb/core/blob_depot/agent/read.cpp
@@ -72,25 +72,22 @@ namespace NKikimr::NBlobDepot {
ui32 Size;
ui64 OutputOffset;
};
+ struct TS3ReadItem {
+ TString Key;
+ ui32 Offset;
+ ui32 Size;
+ ui64 OutputOffset;
+ };
std::vector<TReadItem> items;
+ std::vector<TS3ReadItem> s3items;
ui64 offset = arg.Offset;
ui64 size = arg.Size;
for (const auto& value : arg.Value.Chain) {
- const ui32 groupId = value.GroupId;
- const auto& blobId = value.BlobId;
const ui32 begin = value.SubrangeBegin;
const ui32 end = value.SubrangeEnd;
- if (end <= begin || blobId.BlobSize() < end) {
- error = "incorrect SubrangeBegin/SubrangeEnd pair";
- STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA24, error, (AgentId, Agent.LogId), (QueryId, GetQueryId()),
- (ReadId, arg.Tag), (Key, Agent.PrettyKey(arg.Key)), (Offset, arg.Offset), (Size, arg.Size),
- (Value, arg.Value));
- return false;
- }
-
// calculate the whole length of current part
ui64 partLen = end - begin;
if (offset >= partLen) {
@@ -103,7 +100,26 @@ namespace NKikimr::NBlobDepot {
partLen = Min(size ? size : Max<ui64>(), partLen - offset);
Y_ABORT_UNLESS(partLen);
- items.push_back(TReadItem{groupId, blobId, ui32(offset + begin), ui32(partLen), outputOffset});
+ ui32 itemLen = 0;
+ ui32 partOffset = offset + begin;
+
+ if (value.Blob) {
+ const auto& [blobId, groupId] = *value.Blob;
+ items.push_back(TReadItem{groupId, blobId, partOffset, ui32(partLen), outputOffset});
+ itemLen = blobId.BlobSize();
+ } else if (const auto& locator = value.S3Locator) {
+ TString key = locator->MakeObjectName(Agent.S3BasePath);
+ s3items.push_back(TS3ReadItem{std::move(key), partOffset, ui32(partLen), outputOffset});
+ itemLen = locator->Len;
+ }
+
+ if (end <= begin || itemLen < end) {
+ error = "incorrect SubrangeBegin/SubrangeEnd pair";
+ STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA24, error, (AgentId, Agent.LogId), (QueryId, GetQueryId()),
+ (ReadId, arg.Tag), (Key, Agent.PrettyKey(arg.Key)), (Offset, arg.Offset), (Size, arg.Size),
+ (Value, arg.Value));
+ return false;
+ }
outputOffset += partLen;
offset = 0;
@@ -156,6 +172,85 @@ namespace NKikimr::NBlobDepot {
++context->NumPartsPending;
}
+ for (TS3ReadItem& item : s3items) {
+ class TGetActor : public TActor<TGetActor> {
+ size_t OutputOffset;
+ std::shared_ptr<TReadContext> ReadContext;
+ TQuery* const Query;
+
+ TString AgentLogId;
+ TString QueryId;
+ ui64 ReadId;
+
+ public:
+ TGetActor(size_t outputOffset, std::shared_ptr<TReadContext> readContext, TQuery *query)
+ : TActor(&TThis::StateFunc)
+ , OutputOffset(outputOffset)
+ , ReadContext(std::move(readContext))
+ , Query(query)
+ , AgentLogId(query->Agent.LogId)
+ , QueryId(query->GetQueryId())
+ , ReadId(ReadContext->GetTag())
+ {}
+
+ void Handle(NWrappers::TEvExternalStorage::TEvGetObjectResponse::TPtr ev) {
+ auto& msg = *ev->Get();
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA55, "received TEvGetObjectResponse",
+ (AgentId, AgentLogId), (QueryId, QueryId), (ReadId, ReadId),
+ (Response, msg.Result), (BodyLen, std::size(msg.Body)));
+
+ if (msg.IsSuccess()) {
+ Finish(std::move(msg.Body), "");
+ } else if (const auto& error = msg.GetError(); error.GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) {
+ Finish(std::nullopt, "data has disappeared from S3");
+ } else {
+ Finish(std::nullopt, msg.GetError().GetMessage().c_str());
+ }
+ }
+
+ void HandleUndelivered() {
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA56, "received TEvUndelivered",
+ (AgentId, AgentLogId), (QueryId, QueryId), (ReadId, ReadId));
+ Finish(std::nullopt, "wrapper actor terminated");
+ }
+
+ void Finish(std::optional<TString> data, const char *error) {
+ auto& context = *ReadContext;
+ if (!context.Terminated && !context.StopProcessingParts) {
+ if (data) {
+ context.Buffer.Write(OutputOffset, TRope(std::move(*data)));
+ if (!--context.NumPartsPending) {
+ context.EndWithSuccess(Query);
+ }
+ } else {
+ context.EndWithError(Query, NKikimrProto::ERROR, TStringBuilder()
+ << "failed to fetch data from S3: " << error);
+ }
+ }
+ PassAway();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(NWrappers::TEvExternalStorage::TEvGetObjectResponse, Handle)
+ cFunc(TEvents::TSystem::Undelivered, HandleUndelivered)
+ cFunc(TEvents::TSystem::Poison, PassAway)
+ )
+ };
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA57, "starting S3 read", (AgentId, Agent.LogId), (QueryId, GetQueryId()),
+ (ReadId, context->GetTag()), (Key, item.Key), (Offset, item.Offset), (Size, item.Size),
+ (OutputOffset, item.OutputOffset));
+ const TActorId actorId = Agent.RegisterWithSameMailbox(new TGetActor(item.OutputOffset, context, this));
+ auto request = std::make_unique<NWrappers::TEvExternalStorage::TEvGetObjectRequest>(
+ Aws::S3::Model::GetObjectRequest()
+ .WithBucket(Agent.S3BackendSettings->GetSettings().GetBucket())
+ .WithKey(std::move(item.Key))
+ .WithRange(TStringBuilder() << "bytes=" << item.Offset << '-' << item.Offset + item.Size - 1)
+ );
+ TActivationContext::Send(new IEventHandle(Agent.S3WrapperId, actorId, request.release(), IEventHandle::FlagTrackDelivery));
+ ++context->NumPartsPending;
+ }
+
Y_ABORT_UNLESS(context->NumPartsPending);
return true;
diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp
index 6527bf6f7e2..8af4498e93d 100644
--- a/ydb/core/blob_depot/agent/request.cpp
+++ b/ydb/core/blob_depot/agent/request.cpp
@@ -97,6 +97,7 @@ namespace NKikimr::NBlobDepot {
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCollectGarbageResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCommitBlobSeqResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvResolveResult::TPtr ev);
+ template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvPrepareWriteS3Result::TPtr ev);
template<typename TEvent>
void TBlobDepotAgent::HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev) {
diff --git a/ydb/core/blob_depot/agent/resolved_value.cpp b/ydb/core/blob_depot/agent/resolved_value.cpp
index 32af8762ad4..b32e6c30c0c 100644
--- a/ydb/core/blob_depot/agent/resolved_value.cpp
+++ b/ydb/core/blob_depot/agent/resolved_value.cpp
@@ -3,16 +3,36 @@
namespace NKikimr::NBlobDepot {
TResolvedValue::TLink::TLink(const NKikimrBlobDepot::TResolvedValueChain& link)
- : BlobId(LogoBlobIDFromLogoBlobID(link.GetBlobId()))
- , GroupId(link.GetGroupId())
- , SubrangeBegin(link.GetSubrangeBegin())
- , SubrangeEnd(link.HasSubrangeEnd() ? link.GetSubrangeEnd() : BlobId.BlobSize())
+ : SubrangeBegin(link.GetSubrangeBegin())
+ , SubrangeEnd(link.GetSubrangeEnd())
{
- Y_DEBUG_ABORT_UNLESS(link.HasBlobId() && link.HasGroupId());
+ std::optional<ui32> length;
+ if (link.HasBlobId() && link.HasGroupId()) {
+ const TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(link.GetBlobId());
+ Blob.emplace(blobId, link.GetGroupId());
+ Y_VERIFY_S(!length || *length == blobId.BlobSize(), SingleLineProto(link));
+ length.emplace(blobId.BlobSize());
+ }
+ if (link.HasS3Locator()) {
+ S3Locator.emplace(TS3Locator::FromProto(link.GetS3Locator()));
+ Y_VERIFY_S(!length || *length == S3Locator->Len, SingleLineProto(link));
+ length.emplace(S3Locator->Len);
+ }
+ if (!link.HasSubrangeEnd()) {
+ Y_VERIFY_S(length, SingleLineProto(link));
+ SubrangeEnd = *length;
+ }
}
void TResolvedValue::TLink::Output(IOutputStream& s) const {
- s << BlobId << '@' << GroupId << '{' << SubrangeBegin << '-' << SubrangeEnd - 1 << '}';
+ if (Blob) {
+ const auto& [blobId, groupId] = *Blob;
+ s << blobId << '@' << groupId;
+ }
+ if (S3Locator) {
+ s << S3Locator->ToString();
+ }
+ s << '{' << SubrangeBegin << '-' << SubrangeEnd - 1 << '}';
}
TString TResolvedValue::TLink::ToString() const {
diff --git a/ydb/core/blob_depot/agent/resolved_value.h b/ydb/core/blob_depot/agent/resolved_value.h
index a3194e65ec6..7429c3b59c1 100644
--- a/ydb/core/blob_depot/agent/resolved_value.h
+++ b/ydb/core/blob_depot/agent/resolved_value.h
@@ -6,8 +6,8 @@ namespace NKikimr::NBlobDepot {
struct TResolvedValue {
struct TLink {
- TLogoBlobID BlobId;
- ui32 GroupId;
+ std::optional<std::tuple<TLogoBlobID, ui32>> Blob;
+ std::optional<TS3Locator> S3Locator;
ui32 SubrangeBegin;
ui32 SubrangeEnd;
@@ -16,10 +16,7 @@ namespace NKikimr::NBlobDepot {
void Output(IOutputStream& s) const;
TString ToString() const;
- friend bool operator ==(const TLink& x, const TLink& y) {
- return x.BlobId == y.BlobId && x.GroupId == y.GroupId && x.SubrangeBegin == y.SubrangeBegin &&
- x.SubrangeEnd == y.SubrangeEnd;
- }
+ friend std::strong_ordering operator <=>(const TLink&, const TLink&) = default;
};
bool Defined = false;
diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp
index a1d8aeb6f20..5ead9d58a5e 100644
--- a/ydb/core/blob_depot/agent/storage_put.cpp
+++ b/ydb/core/blob_depot/agent/storage_put.cpp
@@ -15,19 +15,36 @@ namespace NKikimr::NBlobDepot {
bool WrittenBeyondBarrier = false;
NKikimrBlobDepot::TEvCommitBlobSeq CommitBlobSeq;
TBlobSeqId BlobSeqId;
+ std::optional<TS3Locator> LocatorInFlight;
+ TActorId WriterActorId;
+
+ struct TLifetimeToken {};
+ std::shared_ptr<TLifetimeToken> LifetimeToken;
public:
using TBlobStorageQuery::TBlobStorageQuery;
void OnDestroy(bool success) override {
- if (IsInFlight) {
+ if (IsInFlight || LocatorInFlight) {
Y_ABORT_UNLESS(!success);
- RemoveBlobSeqFromInFlight();
+ if (IsInFlight) {
+ RemoveBlobSeqFromInFlight();
+ }
NKikimrBlobDepot::TEvDiscardSpoiledBlobSeq msg;
- BlobSeqId.ToProto(msg.AddItems());
+ if (IsInFlight) {
+ BlobSeqId.ToProto(msg.AddItems());
+ }
+ if (LocatorInFlight) {
+ LocatorInFlight->ToProto(msg.AddS3Locators());
+ }
Agent.Issue(std::move(msg), this, nullptr);
}
+ if (WriterActorId) {
+ TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, WriterActorId, Agent.SelfId(),
+ nullptr, 0));
+ }
+
TBlobStorageQuery::OnDestroy(success);
}
@@ -67,6 +84,24 @@ namespace NKikimr::NBlobDepot {
void IssuePuts() {
Y_ABORT_UNLESS(!PutsIssued);
+ auto prepare = [&] {
+ Y_ABORT_UNLESS(CommitBlobSeq.ItemsSize() == 0);
+ auto *commitItem = CommitBlobSeq.AddItems();
+ commitItem->SetKey(Request.Id.AsBinaryString());
+ for (const auto& [tabletId, generation] : Request.ExtraBlockChecks) {
+ auto *p = commitItem->AddExtraBlockChecks();
+ p->SetTabletId(tabletId);
+ p->SetGeneration(generation);
+ }
+ return commitItem;
+ };
+
+ if (const auto& s3 = Agent.S3BackendSettings; s3 && s3->HasSyncMode()) {
+ auto *commitItem = prepare();
+ commitItem->MutableS3Locator();
+ return IssueS3Put();
+ }
+
const auto it = Agent.ChannelKinds.find(NKikimrBlobDepot::TChannelKind::Data);
if (it == Agent.ChannelKinds.end()) {
return EndWithError(NKikimrProto::ERROR, "no Data channels");
@@ -90,9 +125,7 @@ namespace NKikimr::NBlobDepot {
BDEV_QUERY(BDEV09, "TEvPut_new", (U.BlobId, Request.Id), (U.BufferSize, Request.Buffer.size()),
(U.HandleClass, Request.HandleClass));
- Y_ABORT_UNLESS(CommitBlobSeq.ItemsSize() == 0);
- auto *commitItem = CommitBlobSeq.AddItems();
- commitItem->SetKey(Request.Id.AsBinaryString());
+ auto *commitItem = prepare();
auto *locator = commitItem->MutableBlobLocator();
BlobSeqId.ToProto(locator->MutableBlobSeqId());
//locator->SetChecksum(Crc32c(Request.Buffer.data(), Request.Buffer.size()));
@@ -192,15 +225,23 @@ namespace NKikimr::NBlobDepot {
}
void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override {
- if (auto *p = std::get_if<TEvBlobStorage::TEvPutResult*>(&response)) {
- HandlePutResult(std::move(context), **p);
- } else if (auto *p = std::get_if<TEvBlobDepot::TEvCommitBlobSeqResult*>(&response)) {
- HandleCommitBlobSeqResult(std::move(context), (*p)->Record);
- } else if (std::holds_alternative<TTabletDisconnected>(response)) {
- EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected");
- } else {
- Y_ABORT("unexpected response");
- }
+ std::visit(TOverloaded{
+ [&](TEvBlobStorage::TEvPutResult *ev) {
+ HandlePutResult(std::move(context), *ev);
+ },
+ [&](TEvBlobDepot::TEvCommitBlobSeqResult *ev) {
+ HandleCommitBlobSeqResult(std::move(context), ev->Record);
+ },
+ [&](TTabletDisconnected) {
+ EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected");
+ },
+ [&](TEvBlobDepot::TEvPrepareWriteS3Result *ev) {
+ HandlePrepareWriteS3Result(std::move(context), ev->Record);
+ },
+ [&](auto /*other*/) {
+ Y_ABORT("unexpected response");
+ }
+ }, response);
}
void HandlePutResult(TRequestContext::TPtr /*context*/, TEvBlobStorage::TEvPutResult& msg) {
@@ -284,6 +325,107 @@ namespace NKikimr::NBlobDepot {
ui64 GetTabletId() const override {
return Request.Id.TabletID();
}
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // writing directly to S3
+
+ void IssueS3Put() {
+ NKikimrBlobDepot::TEvPrepareWriteS3 query;
+ auto *item = query.AddItems();
+ item->SetKey(Request.Id.AsBinaryString());
+ for (const auto& [tabletId, generation] : Request.ExtraBlockChecks) {
+ auto *p = item->AddExtraBlockChecks();
+ p->SetTabletId(tabletId);
+ p->SetGeneration(generation);
+ }
+ item->SetLen(Request.Id.BlobSize());
+ Agent.Issue(query, this, nullptr);
+ }
+
+ void HandlePrepareWriteS3Result(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvPrepareWriteS3Result& msg) {
+ Y_ABORT_UNLESS(msg.ItemsSize() == 1);
+ const auto& item = msg.GetItems(0);
+ if (item.GetStatus() != NKikimrProto::OK) {
+ return EndWithError(item.GetStatus(), item.GetErrorReason());
+ }
+
+ auto *commitItem = CommitBlobSeq.MutableItems(0);
+ auto *locator = commitItem->MutableS3Locator();
+ locator->CopyFrom(item.GetS3Locator());
+
+ LocatorInFlight.emplace(TS3Locator::FromProto(*locator));
+
+ class TWriteActor : public TActor<TWriteActor> {
+ std::weak_ptr<TLifetimeToken> LifetimeToken;
+ TPutQuery* const PutQuery;
+
+ public:
+ TWriteActor(std::weak_ptr<TLifetimeToken> lifetimeToken, TPutQuery *putQuery)
+ : TActor(&TThis::StateFunc)
+ , LifetimeToken(std::move(lifetimeToken))
+ , PutQuery(putQuery)
+ {}
+
+ void Handle(NWrappers::TEvExternalStorage::TEvPutObjectResponse::TPtr ev) {
+ auto& msg = *ev->Get();
+ Finish(msg.IsSuccess()
+ ? std::nullopt
+ : std::make_optional<TString>(msg.GetError().GetMessage()));
+ }
+
+ void HandleUndelivered() {
+ Finish("event undelivered");
+ }
+
+ void Finish(std::optional<TString>&& error) {
+ if (!LifetimeToken.expired()) {
+ InvokeOtherActor(PutQuery->Agent, &TBlobDepotAgent::Invoke, [&] {
+ PutQuery->OnPutS3ObjectResponse(std::move(error));
+ });
+ }
+ PassAway();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(NWrappers::TEvExternalStorage::TEvPutObjectResponse, Handle)
+ cFunc(TEvents::TSystem::Undelivered, HandleUndelivered)
+ cFunc(TEvents::TSystem::Poison, PassAway)
+ )
+ };
+
+ TString key = TS3Locator::FromProto(*locator).MakeObjectName(Agent.S3BasePath);
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA54, "starting WriteActor", (AgentId, Agent.LogId),
+ (QueryId, GetQueryId()), (Key, key));
+
+ LifetimeToken = std::make_shared<TLifetimeToken>();
+ WriterActorId = Agent.RegisterWithSameMailbox(new TWriteActor(LifetimeToken, this));
+
+ TActivationContext::Send(new IEventHandle(Agent.S3WrapperId, WriterActorId,
+ new NWrappers::TEvExternalStorage::TEvPutObjectRequest(
+ Aws::S3::Model::PutObjectRequest()
+ .WithBucket(std::move(Agent.S3BackendSettings->GetSettings().GetBucket()))
+ .WithKey(std::move(key))
+ .AddMetadata("key", Request.Id.ToString()),
+ Request.Buffer.ExtractUnderlyingContainerOrCopy<TString>()),
+ IEventHandle::FlagTrackDelivery));
+ }
+
+ void OnPutS3ObjectResponse(std::optional<TString>&& error) {
+ STLOG(error ? PRI_WARN : PRI_DEBUG, BLOB_DEPOT_AGENT, BDA53, "OnPutS3ObjectResponse",
+ (AgentId, Agent.LogId), (QueryId, GetQueryId()), (Error, error));
+
+ WriterActorId = {};
+
+ if (error) {
+ // LocatorInFlight is not reset here on purpose: OnDestroy will generate spoiled blob message to the
+ // tablet
+ EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to put object to S3: " << *error);
+ } else {
+ LocatorInFlight.reset();
+ IssueCommitBlobSeq(false);
+ }
+ }
};
return new TPutQuery(*this, std::move(ev));
diff --git a/ydb/core/blob_depot/agent/ya.make b/ydb/core/blob_depot/agent/ya.make
index 82195569e12..515da83e268 100644
--- a/ydb/core/blob_depot/agent/ya.make
+++ b/ydb/core/blob_depot/agent/ya.make
@@ -37,6 +37,7 @@ LIBRARY()
ydb/core/blobstorage/vdisk/common
ydb/core/blob_depot
ydb/core/protos
+ ydb/core/wrappers
)
END()
diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp
index 19cc2596242..78abeb4ca17 100644
--- a/ydb/core/blob_depot/blob_depot.cpp
+++ b/ydb/core/blob_depot/blob_depot.cpp
@@ -3,6 +3,7 @@
#include "blocks.h"
#include "garbage_collection.h"
#include "data.h"
+#include "s3.h"
#include "data_uncertain.h"
#include "space_monitor.h"
@@ -21,6 +22,7 @@ namespace NKikimr::NBlobDepot {
, BlocksManager(new TBlocksManager(this))
, BarrierServer(new TBarrierServer(this))
, Data(new TData(this))
+ , S3Manager(new TS3Manager(this))
, SpaceMonitor(new TSpaceMonitor(this))
, JsonHandler(std::bind(&TBlobDepot::RenderJson, this, std::placeholders::_1), TEvPrivate::EvJsonTimer, TEvPrivate::EvJsonUpdate)
{}
@@ -39,6 +41,7 @@ namespace NKikimr::NBlobDepot {
hFunc(TEvBlobDepot::TEvQueryBlocks, BlocksManager->Handle);
hFunc(TEvBlobDepot::TEvCollectGarbage, BarrierServer->Handle);
hFunc(TEvBlobDepot::TEvPushNotifyResult, Handle);
+ hFunc(TEvBlobDepot::TEvPrepareWriteS3, Handle);
default:
Y_ABORT();
@@ -110,6 +113,7 @@ namespace NKikimr::NBlobDepot {
fFunc(TEvBlobDepot::EvQueryBlocks, handleFromAgentPipe);
fFunc(TEvBlobDepot::EvPushNotifyResult, handleFromAgentPipe);
fFunc(TEvBlobDepot::EvCollectGarbage, handleFromAgentPipe);
+ fFunc(TEvBlobDepot::EvPrepareWriteS3, handleFromAgentPipe);
fFunc(TEvPrivate::EvDeliver, handleDelivery);
@@ -132,6 +136,10 @@ namespace NKikimr::NBlobDepot {
cFunc(TEvPrivate::EvJsonTimer, JsonHandler.HandleTimer);
cFunc(TEvPrivate::EvJsonUpdate, JsonHandler.HandleUpdate);
+ fFunc(TEvPrivate::EvUploadResult, S3Manager->Handle);
+ fFunc(TEvPrivate::EvDeleteResult, S3Manager->Handle);
+ fFunc(TEvPrivate::EvScanFound, S3Manager->Handle);
+
default:
if (!HandleDefaultEvents(ev, SelfId())) {
Y_ABORT("unexpected event Type# 0x%08" PRIx32, type);
@@ -144,12 +152,14 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepot::PassAway() {
- for (const TActorId& actorId : {GroupAssimilatorId}) {
+ for (const TActorId& actorId : {GroupAssimilatorId, S3Manager->GetWrapperId()}) {
if (actorId) {
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, SelfId(), nullptr, 0));
}
}
+ S3Manager->TerminateAllActors();
+
TActor::PassAway();
}
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index 96aff19f075..b881a588dde 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -34,6 +34,10 @@ namespace NKikimr::NBlobDepot {
EvDeliver,
EvJsonTimer,
EvJsonUpdate,
+ EvUploadResult,
+ EvDeleteResult,
+ EvScanFound,
+ EvScanContinue,
};
};
@@ -77,6 +81,8 @@ namespace NKikimr::NBlobDepot {
NKikimrBlobStorage::TPDiskSpaceColor::E LastPushedSpaceColor = {};
float LastPushedApproximateFreeSpaceShare = 0.0f;
+
+ THashSet<TS3Locator> S3WritesInFlight;
};
struct TPipeServerContext {
@@ -179,6 +185,7 @@ namespace NKikimr::NBlobDepot {
KickSpaceMonitor();
StartDataLoad();
UpdateThroughputs();
+ InitS3Manager();
}
void StartDataLoad();
@@ -269,6 +276,15 @@ namespace NKikimr::NBlobDepot {
void Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev);
void Handle(TEvBlobDepot::TEvDiscardSpoiledBlobSeq::TPtr ev);
+ void Handle(TEvBlobDepot::TEvPrepareWriteS3::TPtr ev);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // S3 operations
+
+ class TS3Manager;
+ std::unique_ptr<TS3Manager> S3Manager;
+
+ void InitS3Manager();
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Space monitoring
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index cd160a07dc3..7abb5c522b8 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -1,6 +1,7 @@
#include "data.h"
#include "data_uncertain.h"
#include "garbage_collection.h"
+#include "s3.h"
namespace NKikimr::NBlobDepot {
@@ -22,7 +23,7 @@ namespace NKikimr::NBlobDepot {
bool TData::TValue::Validate(const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item) {
if (!item.HasBlobLocator()) {
- return false;
+ return true;
}
const auto& locator = item.GetBlobLocator();
return locator.HasGroupId() &&
@@ -42,28 +43,32 @@ namespace NKikimr::NBlobDepot {
return false;
}
- Y_DEBUG_ABORT_UNLESS(chain.HasLocator());
- const auto& locator1 = chain.GetLocator();
- Y_DEBUG_ABORT_UNLESS(locator1.HasGroupId() && locator1.HasBlobSeqId() && locator1.HasTotalDataLen());
+ if (chain.HasBlobLocator() != item.HasBlobLocator()) {
+ return false;
+ } else if (chain.HasBlobLocator() && item.HasBlobLocator()) {
+ Y_DEBUG_ABORT_UNLESS(chain.HasBlobLocator());
+ const auto& locator1 = chain.GetBlobLocator();
+ Y_DEBUG_ABORT_UNLESS(locator1.HasGroupId() && locator1.HasBlobSeqId() && locator1.HasTotalDataLen());
- Y_DEBUG_ABORT_UNLESS(item.HasBlobLocator());
- const auto& locator2 = item.GetBlobLocator();
- Y_DEBUG_ABORT_UNLESS(locator2.HasGroupId() && locator2.HasBlobSeqId() && locator2.HasTotalDataLen());
+ Y_DEBUG_ABORT_UNLESS(item.HasBlobLocator());
+ const auto& locator2 = item.GetBlobLocator();
+ Y_DEBUG_ABORT_UNLESS(locator2.HasGroupId() && locator2.HasBlobSeqId() && locator2.HasTotalDataLen());
#define COMPARE_FIELD(NAME) \
- if (locator1.Has##NAME() != locator2.Has##NAME()) { \
- return false; \
- } else if (locator1.Has##NAME() && locator1.Get##NAME() != locator2.Get##NAME()) { \
- return false; \
- }
- COMPARE_FIELD(GroupId)
- COMPARE_FIELD(Checksum)
- COMPARE_FIELD(TotalDataLen)
- COMPARE_FIELD(FooterLen)
+ if (locator1.Has##NAME() != locator2.Has##NAME()) { \
+ return false; \
+ } else if (locator1.Has##NAME() && locator1.Get##NAME() != locator2.Get##NAME()) { \
+ return false; \
+ }
+ COMPARE_FIELD(GroupId)
+ COMPARE_FIELD(Checksum)
+ COMPARE_FIELD(TotalDataLen)
+ COMPARE_FIELD(FooterLen)
#undef COMPARE_FIELD
- if (TBlobSeqId::FromProto(locator1.GetBlobSeqId()) != TBlobSeqId::FromProto(locator2.GetBlobSeqId())) {
- return false;
+ if (TBlobSeqId::FromProto(locator1.GetBlobSeqId()) != TBlobSeqId::FromProto(locator2.GetBlobSeqId())) {
+ return false;
+ }
}
return true;
@@ -87,7 +92,9 @@ namespace NKikimr::NBlobDepot {
{
auto& [key, value] = *it;
- std::vector<TLogoBlobID> deleteQ;
+ std::vector<TLogoBlobID> deleteBlobs;
+ std::vector<TS3Locator> deleteS3;
+
const bool uncertainWriteBefore = value.UncertainWrite;
const bool wasUncertain = value.IsWrittenUncertainly();
const bool wasGoingToAssimilate = value.GoingToAssimilate;
@@ -99,12 +106,16 @@ namespace NKikimr::NBlobDepot {
#endif
if (!inserted) {
- EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) {
- const auto it = RefCount.find(id);
- Y_ABORT_UNLESS(it != RefCount.end());
- if (!--it->second) {
- deleteQ.push_back(id);
+ auto dropRefCount = [&](auto& map, auto&& key, auto& deleteQ) {
+ if (ui32& rc = --map[key]; !rc) {
+ deleteQ.push_back(std::move(key));
+ } else {
+ Y_ABORT_UNLESS(rc != Max<ui32>());
}
+ };
+ EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), TOverloaded{
+ [&](TLogoBlobID id, ui32, ui32) { dropRefCount(RefCountBlobs, id, deleteBlobs); },
+ [&](TS3Locator locator) { dropRefCount(RefCountS3, locator, deleteS3); }
});
}
@@ -130,49 +141,101 @@ namespace NKikimr::NBlobDepot {
(Outcome, outcomeToString()), (UnderSoft, underSoft), (Inserted, inserted), (Value, value),
(UncertainWriteBefore, uncertainWriteBefore));
- EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) {
- const auto [it, inserted] = RefCount.try_emplace(id);
+ auto returnRefCount = [&](auto& map, auto&& key, auto& deleteQ) {
+ const auto [it, inserted] = map.try_emplace(std::move(key));
+ if (outcome != EUpdateOutcome::DROP) {
+ ++it->second;
+ } else if (inserted) {
+ deleteQ.push_back(it->first);
+ }
if (inserted) {
- Y_ABORT_UNLESS(!CanBeCollected(TBlobSeqId::FromLogoBlobId(id)));
- Y_VERIFY_DEBUG_S(id.Generation() == generation, "BlobId# " << id << " Generation# " << generation);
- Y_VERIFY_DEBUG_S(Self->Channels[id.Channel()].GetLeastExpectedBlobId(generation) <= TBlobSeqId::FromLogoBlobId(id),
- "LeastExpectedBlobId# " << Self->Channels[id.Channel()].GetLeastExpectedBlobId(generation)
- << " Id# " << id
- << " Generation# " << generation);
- AddFirstMentionedBlob(id);
+ AddFirstMentionedBlob(it->first);
}
- if (outcome == EUpdateOutcome::DROP) {
- if (inserted) {
- deleteQ.push_back(id);
+ return inserted;
+ };
+
+ EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), TOverloaded{
+ [&](TLogoBlobID id, ui32, ui32) {
+ if (returnRefCount(RefCountBlobs, id, deleteBlobs)) {
+ auto makeValueChain = [&] {
+ TStringStream s;
+ s << '{';
+ for (bool first = true; const auto& item : value.ValueChain) {
+ s << (std::exchange(first, false) ? "" : " ") << SingleLineProto(item);
+ }
+ s << '}';
+ return s.Str();
+ };
+ Y_VERIFY_S(!CanBeCollected(TBlobSeqId::FromLogoBlobId(id)), "BlobId# " << id
+ << " ValueChain# " << makeValueChain());
+ Y_VERIFY_DEBUG_S(id.Generation() == generation, "BlobId# " << id << " Generation# " << generation);
+ Y_VERIFY_DEBUG_S(Self->Channels[id.Channel()].GetLeastExpectedBlobId(generation) <= TBlobSeqId::FromLogoBlobId(id),
+ "LeastExpectedBlobId# " << Self->Channels[id.Channel()].GetLeastExpectedBlobId(generation)
+ << " Id# " << id
+ << " Generation# " << generation);
}
- } else {
- ++it->second;
+ },
+ [&](TS3Locator locator) {
+ returnRefCount(RefCountS3, locator, deleteS3);
}
});
- auto filter = [&](const TLogoBlobID& id) {
- const auto it = RefCount.find(id);
- Y_ABORT_UNLESS(it != RefCount.end());
+ NIceDb::TNiceDb db(txc.DB);
+
+ auto filterBlobs = [&](TLogoBlobID id) {
+ const auto it = RefCountBlobs.find(id);
+ Y_ABORT_UNLESS(it != RefCountBlobs.end());
if (it->second) {
- return true; // remove this blob from deletion queue, it still has references
- } else {
- InFlightTrash.emplace(cookie, id);
- InFlightTrashSize += id.BlobSize();
- Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_IN_FLIGHT_TRASH_SIZE] = InFlightTrashSize;
- NIceDb::TNiceDb(txc.DB).Table<Schema::Trash>().Key(id.AsBinaryString()).Update();
- RefCount.erase(it);
- TotalStoredDataSize -= id.BlobSize();
- Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_STORED_DATA_SIZE] = TotalStoredDataSize;
- return false; // keep this blob in deletion queue
+ return true;
}
+ RefCountBlobs.erase(it);
+
+ const size_t size = id.BlobSize();
+ InFlightTrashSize += size;
+ Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_IN_FLIGHT_TRASH_SIZE] = InFlightTrashSize;
+ TotalStoredDataSize -= size;
+ Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_STORED_DATA_SIZE] = TotalStoredDataSize;
+
+ InFlightTrashBlobs.emplace(cookie, id);
+ db.Table<Schema::Trash>().Key(id.AsBinaryString()).Update();
+ return false; // keep this blob in deletion queue
};
- std::sort(deleteQ.begin(), deleteQ.end());
- deleteQ.erase(std::unique(deleteQ.begin(), deleteQ.end()), deleteQ.end());
- deleteQ.erase(std::remove_if(deleteQ.begin(), deleteQ.end(), filter), deleteQ.end());
- if (!deleteQ.empty()) {
- UncertaintyResolver->DropBlobs(deleteQ);
+
+ auto filterS3 = [&](TS3Locator locator) {
+ const auto it = RefCountS3.find(locator);
+ Y_ABORT_UNLESS(it != RefCountS3.end());
+ if (it->second) {
+ return true;
+ }
+
+ RefCountS3.erase(it);
+ TotalS3DataSize -= locator.Len;
+
+ AddToS3Trash(locator, txc, cookie);
+ return false; // keep this blob in deletion queue
+ };
+
+ auto process = [&](auto& deleteQ, auto&& filterFunc) {
+ std::ranges::sort(deleteQ);
+ if (auto [first, last] = std::ranges::unique(deleteQ); first != last) {
+ deleteQ.erase(first, last);
+ }
+ if (auto [first, last] = std::ranges::remove_if(deleteQ, filterFunc); first != last) {
+ deleteQ.erase(first, last);
+ }
+ };
+
+ process(deleteBlobs, filterBlobs);
+ process(deleteS3, filterS3);
+
+ if (!deleteBlobs.empty()) {
+ UncertaintyResolver->DropBlobs(deleteBlobs);
}
+ auto& counters = Self->TabletCounters->Simple();
+ counters[NKikimrBlobDepot::COUNTER_TOTAL_S3_DATA_OBJECTS] = RefCountS3.size();
+ counters[NKikimrBlobDepot::COUNTER_TOTAL_S3_DATA_SIZE] = TotalS3DataSize;
+
auto row = NIceDb::TNiceDb(txc.DB).Table<Schema::Data>().Key(key.MakeBinaryKey());
switch (outcome) {
case EUpdateOutcome::DROP:
@@ -196,6 +259,7 @@ namespace NKikimr::NBlobDepot {
}
if (wasUncertain && !value.IsWrittenUncertainly()) {
UncertaintyResolver->MakeKeyCertain(key);
+ Self->S3Manager->OnKeyWritten(key, value.ValueChain);
}
if (wasGoingToAssimilate != value.GoingToAssimilate) {
const i64 sign = value.GoingToAssimilate - wasGoingToAssimilate;
@@ -223,15 +287,7 @@ namespace NKikimr::NBlobDepot {
Y_ABORT_UNLESS(IsKeyLoaded(key));
UpdateKey(key, txc, cookie, "UpdateKey", [&](TValue& value, bool inserted) {
if (!inserted) { // update value items
- value.Meta = item.GetMeta();
- value.Public = false;
- value.UncertainWrite = item.GetUncertainWrite();
-
- // update it to keep new blob locator
- value.ValueChain.Clear();
- auto *chain = value.ValueChain.Add();
- auto *locator = chain->MutableLocator();
- locator->CopyFrom(item.GetBlobLocator());
+ value.UpdateFrom(item);
++value.ValueVersion;
// clear assimilation flag -- we have blob overwritten with fresh copy (of the same data)
@@ -257,7 +313,7 @@ namespace NKikimr::NBlobDepot {
}
if (value.ValueChain.empty()) {
auto *chain = value.ValueChain.Add();
- auto *locator = chain->MutableLocator();
+ auto *locator = chain->MutableBlobLocator();
locator->SetGroupId(Self->Info()->GroupFor(blobSeqId.Channel, blobSeqId.Generation));
blobSeqId.ToProto(locator->MutableBlobSeqId());
locator->SetTotalDataLen(key.GetBlobId().BlobSize());
@@ -270,6 +326,14 @@ namespace NKikimr::NBlobDepot {
});
}
+ void TData::AddToS3Trash(TS3Locator locator, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
+ NIceDb::TNiceDb(txc.DB).Table<Schema::TrashS3>().Key(locator.Generation, locator.KeyId)
+ .Update<Schema::TrashS3::Len>(locator.Len);
+ InFlightTrashSize += locator.Len;
+ Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_IN_FLIGHT_TRASH_SIZE] = InFlightTrashSize;
+ InFlightTrashS3.emplace(cookie, locator);
+ }
+
void TData::MakeKeyCertain(const TKey& key) {
const auto it = Data.find(key);
Y_ABORT_UNLESS(it != Data.end());
@@ -349,10 +413,15 @@ namespace NKikimr::NBlobDepot {
// we can only add key that is not loaded before; if key exists, it MUST have been loaded from the dataset
const auto [it, inserted] = Data.try_emplace(std::move(key), std::move(proto), uncertainWrite);
Y_ABORT_UNLESS(inserted);
- EnumerateBlobsForValueChain(it->second.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) {
- if (!RefCount[id]++) {
- AddFirstMentionedBlob(id);
+
+ auto addRefCount = [&](auto& map, auto&& key) {
+ if (!map[key]++) {
+ AddFirstMentionedBlob(key);
}
+ };
+ EnumerateBlobsForValueChain(it->second.ValueChain, Self->TabletID(), TOverloaded{
+ [&](TLogoBlobID id, ui32, ui32) { addRefCount(RefCountBlobs, id); },
+ [&](TS3Locator locator) { addRefCount(RefCountS3, locator); }
});
if (it->second.GoingToAssimilate) {
Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_BYTES_TO_DECOMMIT] += it->first.GetBlobId().BlobSize();
@@ -536,6 +605,12 @@ namespace NKikimr::NBlobDepot {
Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_STORED_DATA_SIZE] = TotalStoredDataSize;
}
+ void TData::AddFirstMentionedBlob(TS3Locator locator) {
+ auto& counters = Self->TabletCounters->Simple();
+ counters[NKikimrBlobDepot::COUNTER_TOTAL_S3_DATA_OBJECTS] = RefCountS3.size();
+ counters[NKikimrBlobDepot::COUNTER_TOTAL_S3_DATA_SIZE] = TotalS3DataSize += locator.Len;
+ }
+
void TData::AccountBlob(TLogoBlobID id, bool add) {
// account record
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT81, "AccountBlob", (Id, Self->GetLogId()), (BlobId, id), (Add, add));
@@ -687,21 +762,33 @@ namespace NKikimr::NBlobDepot {
LastRecordsValidationTimestamp = now;
TTabletStorageInfo *info = Self->Info();
- THashMap<TLogoBlobID, ui32> refcounts;
+ THashMap<TLogoBlobID, ui32> refcountBlobs;
+ THashMap<TS3Locator, ui32> refcountS3;
for (const auto& [key, value] : Data) {
- EnumerateBlobsForValueChain(value.ValueChain, info->TabletID, [&](TLogoBlobID id, ui32, ui32) {
- ++refcounts[id];
+ EnumerateBlobsForValueChain(value.ValueChain, info->TabletID, TOverloaded{
+ [&](TLogoBlobID id, ui32, ui32) {
+ ++refcountBlobs[id];
+ },
+ [&](TS3Locator locator) {
+ ++refcountS3[locator];
+ }
});
}
- Y_ABORT_UNLESS(RefCount == refcounts);
+ Y_ABORT_UNLESS(RefCountBlobs == refcountBlobs);
+ Y_ABORT_UNLESS(RefCountS3 == refcountS3);
- for (const auto& [cookie, id] : InFlightTrash) {
- const bool inserted = refcounts.try_emplace(id).second;
+ for (const auto& [cookie, id] : InFlightTrashBlobs) {
+ const bool inserted = refcountBlobs.try_emplace(id).second;
+ Y_ABORT_UNLESS(inserted);
+ }
+
+ for (const auto& [cookie, locator] : InFlightTrashS3) {
+ const bool inserted = refcountS3.try_emplace(locator).second;
Y_ABORT_UNLESS(inserted);
}
THashSet<std::tuple<ui8, ui32, TLogoBlobID>> used;
- for (const auto& [id, count] : refcounts) {
+ for (const auto& [id, count] : refcountBlobs) {
const ui32 groupId = info->GroupFor(id.Channel(), id.Generation());
used.emplace(id.Channel(), groupId, id);
}
@@ -716,6 +803,10 @@ namespace NKikimr::NBlobDepot {
#endif
}
+ bool TData::IsUseful(const TS3Locator& locator) const {
+ return RefCountS3.contains(locator);
+ }
+
} // NKikimr::NBlobDepot
template<>
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index 11dd7f12dd2..19e157428ea 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -141,6 +141,18 @@ namespace NKikimr::NBlobDepot {
}
}
+ TString MakeTextualKey() const {
+ if (Data.Type == BlobIdType) {
+ return GetBlobId().ToString();
+ } else if (Data.Type <= MaxInlineStringLen || Data.Type == StringType) {
+ return TString(GetStringBuf());
+ } else if (Data.Type == MinType) {
+ return {};
+ } else {
+ Y_ABORT();
+ }
+ }
+
static TKey FromBinaryKey(const TString& key, const NKikimrBlobDepot::TBlobDepotConfig& config) {
if (config.HasVirtualGroupId()) {
return TKey(TLogoBlobID::FromBinary(key));
@@ -283,14 +295,8 @@ namespace NKikimr::NBlobDepot {
, UncertainWrite(uncertainWrite)
{}
- explicit TValue(const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item)
- : Meta(item.GetMeta())
- , Public(false)
- , UncertainWrite(item.GetUncertainWrite())
- {
- auto *chain = ValueChain.Add();
- auto *locator = chain->MutableLocator();
- locator->CopyFrom(item.GetBlobLocator());
+ explicit TValue(const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item) {
+ UpdateFrom(item);
}
explicit TValue(EKeepState keepState)
@@ -299,6 +305,21 @@ namespace NKikimr::NBlobDepot {
, UncertainWrite(false)
{}
+ void UpdateFrom(const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item) {
+ Meta = item.GetMeta();
+ Public = false;
+ UncertainWrite = item.GetUncertainWrite();
+
+ ValueChain.Clear();
+ auto *chain = ValueChain.Add();
+ if (item.HasBlobLocator()) {
+ chain->MutableBlobLocator()->CopyFrom(item.GetBlobLocator());
+ }
+ if (item.HasS3Locator()) {
+ chain->MutableS3Locator()->CopyFrom(item.GetS3Locator());
+ }
+ }
+
bool IsWrittenUncertainly() const {
return UncertainWrite && !ValueChain.empty();
}
@@ -420,17 +441,20 @@ namespace NKikimr::NBlobDepot {
bool Loaded = false;
std::map<TKey, TValue> Data;
TClosedIntervalSet<TKey> LoadedKeys; // keys that are already scanned and loaded in the local database
- THashMap<TLogoBlobID, ui32> RefCount;
+ THashMap<TLogoBlobID, ui32> RefCountBlobs;
+ THashMap<TS3Locator, ui32> RefCountS3;
THashMap<std::tuple<ui8, ui32>, TRecordsPerChannelGroup> RecordsPerChannelGroup;
std::optional<TLogoBlobID> LastAssimilatedBlobId;
THashSet<std::tuple<ui8, ui32>> AlreadyCutHistory;
ui64 TotalStoredDataSize = 0;
ui64 TotalStoredTrashSize = 0;
ui64 InFlightTrashSize = 0;
+ ui64 TotalS3DataSize = 0;
friend class TGroupAssimilator;
- THashMultiMap<void*, TLogoBlobID> InFlightTrash; // being committed, but not yet confirmed
+ THashMultiMap<void*, TLogoBlobID> InFlightTrashBlobs; // being committed, but not yet confirmed
+ THashMultiMap<void*, TS3Locator> InFlightTrashS3; // being committed, but not yet confirmed
class TTxIssueGC;
class TTxConfirmGC;
@@ -635,6 +659,8 @@ namespace NKikimr::NBlobDepot {
void BindToBlob(const TKey& key, TBlobSeqId blobSeqId, bool keep, bool doNotKeep,
NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
+ void AddToS3Trash(TS3Locator locator, NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
+
void MakeKeyCertain(const TKey& key);
void HandleCommitCertainKeys();
@@ -662,6 +688,7 @@ namespace NKikimr::NBlobDepot {
void TrimChannelHistory(ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted);
void AddFirstMentionedBlob(TLogoBlobID id);
+ void AddFirstMentionedBlob(TS3Locator locator);
void AccountBlob(TLogoBlobID id, bool add);
bool CanBeCollected(TBlobSeqId id) const;
@@ -670,7 +697,10 @@ namespace NKikimr::NBlobDepot {
template<typename TCallback>
void EnumerateRefCount(TCallback&& callback) {
- for (const auto& [key, value] : RefCount) {
+ for (const auto& [key, value] : RefCountBlobs) {
+ callback(key, value);
+ }
+ for (const auto& [key, value] : RefCountS3) {
callback(key, value);
}
}
@@ -687,12 +717,20 @@ namespace NKikimr::NBlobDepot {
void StartLoad();
bool LoadTrash(NTabletFlatExecutor::TTransactionContext& txc, TString& from, bool& progress);
+ bool LoadTrashS3(NTabletFlatExecutor::TTransactionContext& txc, TS3Locator& from, bool& progress);
void OnLoadComplete();
bool IsLoaded() const { return Loaded; }
bool IsKeyLoaded(const TKey& key) const { return Loaded || LoadedKeys[key]; }
bool EnsureKeyLoaded(const TKey& key, NTabletFlatExecutor::TTransactionContext& txc);
+ template<typename TRecord>
+ bool LoadMissingKeys(const TRecord& record, NTabletFlatExecutor::TTransactionContext& txc);
+
+ std::optional<TString> CheckKeyAgainstBarrier(const TKey& key);
+
+ bool IsUseful(const TS3Locator& locator) const;
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class TResolveDecommitActor;
diff --git a/ydb/core/blob_depot/data_gc.cpp b/ydb/core/blob_depot/data_gc.cpp
index bca035fbcb9..25cb263c8fb 100644
--- a/ydb/core/blob_depot/data_gc.cpp
+++ b/ydb/core/blob_depot/data_gc.cpp
@@ -1,5 +1,6 @@
#include "data.h"
#include "schema.h"
+#include "garbage_collection.h"
namespace NKikimr::NBlobDepot {
@@ -141,4 +142,23 @@ namespace NKikimr::NBlobDepot {
Self->Execute(std::make_unique<TTxHardGC>(Self, channel, groupId, hardGenStep));
}
+ std::optional<TString> TData::CheckKeyAgainstBarrier(const TKey& key) {
+ const auto& v = key.AsVariant();
+ if (const auto *id = std::get_if<TLogoBlobID>(&v)) {
+ bool underSoft, underHard;
+ Self->BarrierServer->GetBlobBarrierRelation(*id, &underSoft, &underHard);
+ if (underHard) {
+ return TStringBuilder() << "under hard barrier# " << Self->BarrierServer->ToStringBarrier(
+ id->TabletID(), id->Channel(), true);
+ } else if (underSoft) {
+ const TData::TValue *value = Self->Data->FindKey(key);
+ if (!value || value->KeepState != NKikimrBlobDepot::EKeepState::Keep) {
+ return TStringBuilder() << "under soft barrier# " << Self->BarrierServer->ToStringBarrier(
+ id->TabletID(), id->Channel(), false);
+ }
+ }
+ }
+ return std::nullopt;
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp
index 8cec6e3f570..113a869fd7c 100644
--- a/ydb/core/blob_depot/data_load.cpp
+++ b/ydb/core/blob_depot/data_load.cpp
@@ -2,6 +2,7 @@
#include "schema.h"
#include "garbage_collection.h"
#include "coro_tx.h"
+#include "s3.h"
namespace NKikimr::NBlobDepot {
@@ -12,7 +13,7 @@ namespace NKikimr::NBlobDepot {
bool progress = false;
TString trash;
- bool trashLoaded = false;
+ TS3Locator s3;
TScanRange r{
.Begin = TKey::Min(),
@@ -21,7 +22,8 @@ namespace NKikimr::NBlobDepot {
.PrechargeBytes = 1'000'000,
};
- while (!(trashLoaded = LoadTrash(*TCoroTx::GetTxc(), trash, progress)) ||
+ while (!LoadTrash(*TCoroTx::GetTxc(), trash, progress) ||
+ !LoadTrashS3(*TCoroTx::GetTxc(), s3, progress) ||
!ScanRange(r, TCoroTx::GetTxc(), &progress, [](const TKey&, const TValue&) { return true; })) {
if (std::exchange(progress, false)) {
TCoroTx::FinishTx();
@@ -61,6 +63,36 @@ namespace NKikimr::NBlobDepot {
return true;
}
+ bool TData::LoadTrashS3(NTabletFlatExecutor::TTransactionContext& txc, TS3Locator& from, bool& progress) {
+ NIceDb::TNiceDb db(txc.DB);
+ auto table = db.Table<Schema::TrashS3>().GreaterOrEqual(from.Generation, from.KeyId);
+ static constexpr ui64 PrechargeRows = 10'000;
+ static constexpr ui64 PrechargeBytes = 1'000'000;
+ if (!table.Precharge(PrechargeRows, PrechargeBytes)) {
+ return false;
+ }
+ auto rows = table.Select();
+ if (!rows.IsReady()) {
+ return false;
+ }
+ while (rows.IsValid()) {
+ TS3Locator item{
+ .Len = rows.GetValue<Schema::TrashS3::Len>(),
+ .Generation = rows.GetValue<Schema::TrashS3::Generation>(),
+ .KeyId = rows.GetValue<Schema::TrashS3::KeyId>(),
+ };
+ if (item != from) {
+ Self->S3Manager->AddTrashToCollect(item);
+ from = item;
+ progress = true;
+ }
+ if (!rows.Next()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
void TData::OnLoadComplete() {
Self->Data->LoadedKeys([&](const TKey& left, const TKey& right) {
// verify that LoadedKeys == {Min, Max} exactly
@@ -111,6 +143,23 @@ namespace NKikimr::NBlobDepot {
}
}
+ template<typename TRecord>
+ bool TData::LoadMissingKeys(const TRecord& record, NTabletFlatExecutor::TTransactionContext& txc) {
+ if (IsLoaded()) {
+ return true;
+ }
+ for (const auto& item : record.GetItems()) {
+ auto key = TKey::FromBinaryKey(item.GetKey(), Self->Config);
+ if (!EnsureKeyLoaded(key, txc)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ template bool TData::LoadMissingKeys(const NKikimrBlobDepot::TEvCommitBlobSeq& record, NTabletFlatExecutor::TTransactionContext& txc);
+ template bool TData::LoadMissingKeys(const NKikimrBlobDepot::TEvPrepareWriteS3& record, NTabletFlatExecutor::TTransactionContext& txc);
+
void TBlobDepot::StartDataLoad() {
Data->StartLoad();
}
diff --git a/ydb/core/blob_depot/data_mon.cpp b/ydb/core/blob_depot/data_mon.cpp
index 174ca8a53e6..351d6818f53 100644
--- a/ydb/core/blob_depot/data_mon.cpp
+++ b/ydb/core/blob_depot/data_mon.cpp
@@ -1,6 +1,7 @@
#include "data.h"
#include "data_uncertain.h"
#include "mon_main.h"
+#include "s3.h"
namespace NKikimr::NBlobDepot {
@@ -16,9 +17,13 @@ namespace NKikimr::NBlobDepot {
KEYVALUE_TABLE({
KEYVALUE_P("Loaded", Loaded ? "true" : "false");
KEYVALUE_P("Data size, number of keys", Data.size());
- KEYVALUE_P("RefCount size, number of blobs", RefCount.size());
+ KEYVALUE_P("RefCount size, number of blobs", RefCountBlobs.size());
KEYVALUE_P("Total stored data size, bytes", FormatByteSize(TotalStoredDataSize));
KEYVALUE_P("Keys made certain, number of keys", KeysMadeCertain.size());
+ KEYVALUE_P("Total number of useful S3 objects", RefCountS3.size());
+ KEYVALUE_P("Total bytes in useful S3 objects", FormatByteSize(TotalS3DataSize));
+ KEYVALUE_P("Total number of trash S3 objects", Self->S3Manager->GetTotalS3TrashObjects());
+ KEYVALUE_P("Total bytes in trash S3 objects", FormatByteSize(Self->S3Manager->GetTotalS3TrashSize()));
})
}
}
diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp
index 0f743eda5cb..9dc71cd0a16 100644
--- a/ydb/core/blob_depot/data_resolve.cpp
+++ b/ydb/core/blob_depot/data_resolve.cpp
@@ -215,17 +215,23 @@ namespace NKikimr::NBlobDepot {
LogoBlobIDFromLogoBlobID(key.GetBlobId(), out->MutableBlobId());
item.SetReliablyWritten(reliablyWritten);
} else {
- EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](const TLogoBlobID& id, ui32 begin, ui32 end) {
- if (begin != end) {
- auto *out = item.AddValueChain();
- out->SetGroupId(Self->Info()->GroupFor(id.Channel(), id.Generation()));
- LogoBlobIDFromLogoBlobID(id, out->MutableBlobId());
- if (begin) {
- out->SetSubrangeBegin(begin);
- }
- if (end != id.BlobSize()) {
- out->SetSubrangeEnd(end);
+ EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), TOverloaded {
+ [&](TLogoBlobID id, ui32 begin, ui32 end) {
+ if (begin != end) {
+ auto *out = item.AddValueChain();
+ out->SetGroupId(Self->Info()->GroupFor(id.Channel(), id.Generation()));
+ LogoBlobIDFromLogoBlobID(id, out->MutableBlobId());
+ if (begin) {
+ out->SetSubrangeBegin(begin);
+ }
+ if (end != id.BlobSize()) {
+ out->SetSubrangeEnd(end);
+ }
}
+ },
+ [&](TS3Locator locator) {
+ auto *out = item.AddValueChain();
+ locator.ToProto(out->MutableS3Locator());
}
});
item.SetReliablyWritten(true);
diff --git a/ydb/core/blob_depot/data_trash.cpp b/ydb/core/blob_depot/data_trash.cpp
index 1fca22e9165..802653c35f6 100644
--- a/ydb/core/blob_depot/data_trash.cpp
+++ b/ydb/core/blob_depot/data_trash.cpp
@@ -1,11 +1,12 @@
#include "data.h"
+#include "s3.h"
namespace NKikimr::NBlobDepot {
using TData = TBlobDepot::TData;
void TData::CommitTrash(void *cookie) {
- auto [first, last] = InFlightTrash.equal_range(cookie);
+ auto [first, last] = InFlightTrashBlobs.equal_range(cookie);
std::unordered_set<TRecordsPerChannelGroup*> records;
for (auto it = first; it != last; ++it) {
auto& record = GetRecordsPerChannelGroup(it->second);
@@ -13,12 +14,20 @@ namespace NKikimr::NBlobDepot {
records.insert(&record);
InFlightTrashSize -= it->second.BlobSize();
}
- InFlightTrash.erase(first, last);
- Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_IN_FLIGHT_TRASH_SIZE] = InFlightTrashSize;
+ InFlightTrashBlobs.erase(first, last);
for (TRecordsPerChannelGroup *record : records) {
record->CollectIfPossible(this);
}
+
+ auto [s3first, s3last] = InFlightTrashS3.equal_range(cookie);
+ for (auto it = s3first; it != s3last; ++it) {
+ Self->S3Manager->AddTrashToCollect(it->second);
+ InFlightTrashSize -= it->second.Len;
+ }
+ InFlightTrashS3.erase(s3first, s3last);
+
+ Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_IN_FLIGHT_TRASH_SIZE] = InFlightTrashSize;
}
void TData::HandleTrash(TRecordsPerChannelGroup& record) {
diff --git a/ydb/core/blob_depot/data_uncertain.cpp b/ydb/core/blob_depot/data_uncertain.cpp
index 51fe983fffc..e5a8b5377de 100644
--- a/ydb/core/blob_depot/data_uncertain.cpp
+++ b/ydb/core/blob_depot/data_uncertain.cpp
@@ -29,8 +29,12 @@ namespace NKikimr::NBlobDepot {
(Sender, entry->Result.GetSender()), (Cookie, entry->Result.GetCookie()), (Key, key));
// obtain list of blobs belonging to this key only once and here
- EnumerateBlobsForValueChain(value->ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) {
- keyContext.BlobState.emplace(id, std::make_tuple(EKeyBlobState::INITIAL, TString()));
+ EnumerateBlobsForValueChain(value->ValueChain, Self->TabletID(), TOverloaded{
+ [&](TLogoBlobID id, ui32, ui32) {
+ keyContext.BlobState.emplace(id, std::make_tuple(EKeyBlobState::INITIAL, TString()));
+ },
+ [&](TS3Locator) {
+ }
});
// try to process the blobs
diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h
index aa001274f42..ca94809b648 100644
--- a/ydb/core/blob_depot/events.h
+++ b/ydb/core/blob_depot/events.h
@@ -28,6 +28,8 @@ namespace NKikimr {
EvResolveResult,
EvDiscardSpoiledBlobSeq,
EvPushMetrics,
+ EvPrepareWriteS3,
+ EvPrepareWriteS3Result,
};
#define BLOBDEPOT_PARAM_ARG(ARG) std::optional<std::decay_t<decltype(Record.Get##ARG())>> param##ARG,
@@ -73,6 +75,8 @@ namespace NKikimr {
BLOBDEPOT_EVENT_PB(EvResolveResult, Status, ErrorReason);
BLOBDEPOT_EVENT_PB_NO_ARGS(EvDiscardSpoiledBlobSeq);
BLOBDEPOT_EVENT_PB(EvPushMetrics, BytesRead, BytesWritten);
+ BLOBDEPOT_EVENT_PB_NO_ARGS(EvPrepareWriteS3);
+ BLOBDEPOT_EVENT_PB_NO_ARGS(EvPrepareWriteS3Result);
template<typename TEvent>
struct TResponseFor {};
@@ -85,6 +89,7 @@ namespace NKikimr {
template<> struct TResponseFor<TEvCollectGarbage> { using Type = TEvCollectGarbageResult; };
template<> struct TResponseFor<TEvCommitBlobSeq> { using Type = TEvCommitBlobSeqResult; };
template<> struct TResponseFor<TEvResolve> { using Type = TEvResolveResult; };
+ template<> struct TResponseFor<TEvPrepareWriteS3> { using Type = TEvPrepareWriteS3Result; };
template<typename TRequestEvent, typename... TArgs>
static auto MakeResponseFor(TEventHandle<TRequestEvent>& ev, TArgs&&... args) {
@@ -106,6 +111,7 @@ namespace NKikimr {
template<> struct TEventFor<NKikimrBlobDepot::TEvResolve> { using Type = TEvResolve; };
template<> struct TEventFor<NKikimrBlobDepot::TEvCommitBlobSeq> { using Type = TEvCommitBlobSeq; };
template<> struct TEventFor<NKikimrBlobDepot::TEvDiscardSpoiledBlobSeq> { using Type = TEvDiscardSpoiledBlobSeq; };
+ template<> struct TEventFor<NKikimrBlobDepot::TEvPrepareWriteS3> { using Type = TEvPrepareWriteS3; };
};
} // NKikimr
diff --git a/ydb/core/blob_depot/mon_main.cpp b/ydb/core/blob_depot/mon_main.cpp
index 0ea803bd861..6da0b21ea66 100644
--- a/ydb/core/blob_depot/mon_main.cpp
+++ b/ydb/core/blob_depot/mon_main.cpp
@@ -4,6 +4,7 @@
#include "blocks.h"
#include "space_monitor.h"
#include "mon_main.h"
+#include "s3.h"
namespace NKikimr::NBlobDepot {
@@ -199,14 +200,18 @@ namespace NKikimr::NBlobDepot {
key.Output(Stream);
}
TABLED() {
- bool first = true;
- for (const auto& item : value.ValueChain) {
+ for (bool first = true; const auto& item : value.ValueChain) {
if (first) {
first = false;
} else {
Stream << "<br/>";
}
- Stream << TBlobSeqId::FromProto(item.GetLocator().GetBlobSeqId()).ToString();
+ if (item.HasBlobLocator()) {
+ Stream << TBlobSeqId::FromProto(item.GetBlobLocator().GetBlobSeqId()).ToString();
+ }
+ if (item.HasS3Locator()) {
+ Stream << TS3Locator::FromProto(item.GetS3Locator()).ToString();
+ }
if (item.HasSubrangeBegin() || item.HasSubrangeEnd()) {
Stream << "[";
if (item.HasSubrangeBegin()) {
@@ -242,9 +247,9 @@ namespace NKikimr::NBlobDepot {
TABLEH() { Stream << "blob id"; }
TABLEH() { Stream << "refcount"; }
} else {
- Self->Data->EnumerateRefCount([&](TLogoBlobID id, ui32 count) {
+ Self->Data->EnumerateRefCount([&](const auto& id, ui32 count) {
TABLER() {
- TABLED() { Stream << id; }
+ TABLED() { Stream << id.ToString(); }
TABLED() { Stream << count; }
}
});
@@ -320,13 +325,19 @@ namespace NKikimr::NBlobDepot {
auto *info = Self->Info();
std::map<ui32, std::tuple<ui64, ui64>> space;
- Self->Data->EnumerateRefCount([&](TLogoBlobID id, ui32 /*refCount*/) {
- const ui32 groupId = info->GroupFor(id.Channel(), id.Generation());
- auto& [current, total] = space[groupId];
- total += id.BlobSize();
- if (id.Generation() == generation) {
- current += id.BlobSize();
- }
+ Self->Data->EnumerateRefCount([&](const auto& key, ui32) {
+ TOverloaded callback{
+ [&](TLogoBlobID id) {
+ const ui32 groupId = info->GroupFor(id.Channel(), id.Generation());
+ auto& [current, total] = space[groupId];
+ total += id.BlobSize();
+ if (id.Generation() == generation) {
+ current += id.BlobSize();
+ }
+ },
+ [&](TS3Locator) {}
+ };
+ callback(key);
});
for (const auto& [groupId, _] : Self->SpaceMonitor->Groups) {
@@ -479,8 +490,12 @@ document.addEventListener("DOMContentLoaded", ready);
ui64 total = 0;
ui64 trashInFlight = 0;
ui64 trashPending = 0;
- Data->EnumerateRefCount([&](TLogoBlobID id, ui32 /*refCount*/) {
- total += id.BlobSize();
+ Data->EnumerateRefCount([&](const auto& key, ui32) {
+ TOverloaded callback{
+ [&](TLogoBlobID id) { total += id.BlobSize(); },
+ [&](TS3Locator) {}
+ };
+ callback(key);
});
Data->EnumerateTrash([&](ui32 /*groupId*/, TLogoBlobID id, bool inFlight) {
(inFlight ? trashInFlight : trashPending) += id.BlobSize();
@@ -567,8 +582,12 @@ document.addEventListener("DOMContentLoaded", ready);
ui64 total = 0;
ui64 trashInFlight = 0;
ui64 trashPending = 0;
- Data->EnumerateRefCount([&](TLogoBlobID id, ui32 /*refCount*/) {
- total += id.BlobSize();
+ Data->EnumerateRefCount([&](const auto& key, ui32) {
+ TOverloaded callback{
+ [&](TLogoBlobID id) { total += id.BlobSize(); },
+ [&](TS3Locator) {}
+ };
+ callback(key);
});
Data->EnumerateTrash([&](ui32 /*groupId*/, TLogoBlobID id, bool inFlight) {
(inFlight ? trashInFlight : trashPending) += id.BlobSize();
diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp
index e7aa4f6ec1f..be78a554e7f 100644
--- a/ydb/core/blob_depot/op_commit_blob_seq.cpp
+++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp
@@ -1,8 +1,8 @@
#include "blob_depot_tablet.h"
#include "schema.h"
#include "data.h"
-#include "garbage_collection.h"
#include "blocks.h"
+#include "s3.h"
namespace NKikimr::NBlobDepot {
@@ -30,7 +30,10 @@ namespace NKikimr::NBlobDepot {
const auto& items = Request->Get()->Record.GetItems();
Self->TabletCounters->Cumulative()[NKikimrBlobDepot::COUNTER_PUTS_INCOMING] += items.size();
for (const auto& item : items) {
- if (TData::TValue::Validate(item) && !item.GetCommitNotify()) {
+ if (!TData::TValue::Validate(item)) {
+ continue;
+ }
+ if (!item.GetCommitNotify() && item.HasBlobLocator()) {
const auto blobSeqId = TBlobSeqId::FromProto(item.GetBlobLocator().GetBlobSeqId());
if (Self->Data->CanBeCollected(blobSeqId)) {
// check for internal sanity -- we can't issue barriers on given ids without confirmed trimming
@@ -54,12 +57,10 @@ namespace NKikimr::NBlobDepot {
return true;
}
- if (!LoadMissingKeys(txc)) {
+ if (!Self->Data->LoadMissingKeys(Request->Get()->Record, txc)) {
return false;
}
- NIceDb::TNiceDb db(txc.DB);
-
NKikimrBlobDepot::TEvCommitBlobSeqResult *responseRecord;
std::tie(Response, responseRecord) = TEvBlobDepot::MakeResponseFor(*Request);
@@ -67,33 +68,70 @@ namespace NKikimr::NBlobDepot {
for (const auto& item : Request->Get()->Record.GetItems()) {
auto *responseItem = responseRecord->AddItems();
+
+ auto finishWithError = [&](NKikimrProto::EReplyStatus status, const TString& errorReason) {
+ responseItem->SetStatus(status);
+ responseItem->SetErrorReason(errorReason);
+ if (item.HasS3Locator()) {
+ const auto& locator = TS3Locator::FromProto(item.GetS3Locator());
+ const size_t numErased = agent.S3WritesInFlight.erase(locator);
+ if (locator.Generation < generation) {
+ Y_ABORT_UNLESS(!numErased);
+ Self->Data->AddToS3Trash(locator, txc, this);
+ } else {
+ Y_ABORT_UNLESS(numErased == 1);
+ Self->S3Manager->AddTrashToCollect(locator);
+ }
+ }
+ };
+
if (!TData::TValue::Validate(item)) {
- responseItem->SetStatus(NKikimrProto::ERROR);
- responseItem->SetErrorReason("TEvCommitBlobSeq item protobuf is not valid");
+ finishWithError(NKikimrProto::ERROR, "TEvCommitBlobSeq item protobuf is not valid");
continue;
}
- const auto& blobLocator = item.GetBlobLocator();
- const auto blobSeqId = TBlobSeqId::FromProto(blobLocator.GetBlobSeqId());
- if (FailedBlobSeqIds.contains(blobSeqId)) {
- responseItem->SetStatus(NKikimrProto::ERROR);
- responseItem->SetErrorReason("couldn't start commit sequence for blob");
- continue;
+ bool canBeCollected = false; // can the just-written-blob be collected with GC logic?
+
+ if (item.HasBlobLocator()) {
+ const auto& blobLocator = item.GetBlobLocator();
+
+ const auto blobSeqId = TBlobSeqId::FromProto(blobLocator.GetBlobSeqId());
+ if (FailedBlobSeqIds.contains(blobSeqId)) {
+ finishWithError(NKikimrProto::ERROR, "couldn't start commit sequence for blob");
+ continue;
+ }
+
+ canBeCollected = Self->Data->CanBeCollected(blobSeqId);
+
+ Y_VERIFY_DEBUG_S(canBeCollected || !CanBeCollectedBlobSeqIds.contains(blobSeqId),
+ "BlobSeqId# " << blobSeqId);
}
- responseItem->SetStatus(NKikimrProto::OK);
+ if (item.HasS3Locator()) {
+ const auto& locator = TS3Locator::FromProto(item.GetS3Locator());
+ if (locator.Generation < generation) {
+ finishWithError(NKikimrProto::ERROR, "S3 locator is obsolete");
+ continue;
+ }
+ }
- const bool canBeCollected = Self->Data->CanBeCollected(blobSeqId);
+ responseItem->SetStatus(NKikimrProto::OK);
auto key = TData::TKey::FromBinaryKey(item.GetKey(), Self->Config);
if (!item.GetCommitNotify()) {
+ bool blocksPass = true;
if (const auto& v = key.AsVariant(); const auto *id = std::get_if<TLogoBlobID>(&v)) {
- if (!Self->BlocksManager->CheckBlock(id->TabletID(), id->Generation())) {
- // FIXME(alexvru): ExtraBlockChecks?
- responseItem->SetStatus(NKikimrProto::BLOCKED);
- responseItem->SetErrorReason("block race detected");
- continue;
+ blocksPass = Self->BlocksManager->CheckBlock(id->TabletID(), id->Generation());
+ }
+ for (const auto& extra : item.GetExtraBlockChecks()) {
+ if (!blocksPass) {
+ break;
}
+ blocksPass = Self->BlocksManager->CheckBlock(extra.GetTabletId(), extra.GetGeneration());
+ }
+ if (!blocksPass) {
+ finishWithError(NKikimrProto::BLOCKED, "block race detected");
+ continue;
}
}
@@ -102,41 +140,51 @@ namespace NKikimr::NBlobDepot {
if (canBeCollected) {
// we can't accept this record, because it is potentially under already issued barrier
- responseItem->SetStatus(NKikimrProto::ERROR);
- responseItem->SetErrorReason("generation race");
+ finishWithError(NKikimrProto::ERROR, "generation race");
continue;
}
- Y_VERIFY_DEBUG_S(!CanBeCollectedBlobSeqIds.contains(blobSeqId), "BlobSeqId# " << blobSeqId);
-
- TString error;
- if (!CheckKeyAgainstBarrier(key, &error)) {
- responseItem->SetStatus(NKikimrProto::ERROR);
- responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << key.ToString()
- << " is being put beyond the barrier: " << error);
+ if (auto error = Self->Data->CheckKeyAgainstBarrier(key)) {
+ finishWithError(NKikimrProto::ERROR, TStringBuilder() << "BlobId# " << key.ToString()
+ << " is being put beyond the barrier: " << *error);
continue;
}
if (item.GetCommitNotify()) {
if (item.GetUncertainWrite()) {
- responseItem->SetStatus(NKikimrProto::ERROR);
- responseItem->SetErrorReason("UncertainWrite along with CommitNotify");
+ finishWithError(NKikimrProto::ERROR, "UncertainWrite along with CommitNotify");
} else if (const TData::TValue *v = Self->Data->FindKey(key); v && v->SameValueChainAsIn(item)) {
Self->Data->MakeKeyCertain(key);
} else {
// data race -- this value has been altered since it was previously written
- responseItem->SetStatus(NKikimrProto::RACE);
+ finishWithError(NKikimrProto::RACE, "value has been altered since it was previously written");
}
} else {
- Y_VERIFY_DEBUG_S(AllowedBlobSeqIds.contains(blobSeqId), "BlobSeqId# " << blobSeqId);
- Y_VERIFY_DEBUG_S(
- Self->Channels[blobSeqId.Channel].GetLeastExpectedBlobId(generation) <= blobSeqId,
- "BlobSeqId# " << blobSeqId
- << " LeastExpectedBlobId# " << Self->Channels[blobSeqId.Channel].GetLeastExpectedBlobId(generation)
- << " Generation# " << generation);
- Y_VERIFY_DEBUG_S(blobSeqId.Generation == generation, "BlobSeqId# " << blobSeqId << " Generation# " << generation);
- Y_VERIFY_DEBUG_S(Self->Channels[blobSeqId.Channel].SequenceNumbersInFlight.contains(blobSeqId.ToSequentialNumber()),
- "BlobSeqId# " << blobSeqId);
+ if (item.HasBlobLocator()) {
+ const auto blobSeqId = TBlobSeqId::FromProto(item.GetBlobLocator().GetBlobSeqId());
+ Y_VERIFY_DEBUG_S(AllowedBlobSeqIds.contains(blobSeqId), "BlobSeqId# " << blobSeqId);
+ Y_VERIFY_DEBUG_S(
+ Self->Channels[blobSeqId.Channel].GetLeastExpectedBlobId(generation) <= blobSeqId,
+ "BlobSeqId# " << blobSeqId
+ << " LeastExpectedBlobId# " << Self->Channels[blobSeqId.Channel].GetLeastExpectedBlobId(generation)
+ << " Generation# " << generation);
+ Y_VERIFY_DEBUG_S(blobSeqId.Generation == generation, "BlobSeqId# " << blobSeqId << " Generation# " << generation);
+ Y_VERIFY_DEBUG_S(Self->Channels[blobSeqId.Channel].SequenceNumbersInFlight.contains(blobSeqId.ToSequentialNumber()),
+ "BlobSeqId# " << blobSeqId);
+ }
+ if (item.HasS3Locator()) {
+ auto locator = TS3Locator::FromProto(item.GetS3Locator());
+
+ // remove written item from the trash
+ NIceDb::TNiceDb(txc.DB).Table<Schema::TrashS3>().Key(locator.Generation, locator.KeyId).Delete();
+
+ // remove item from agent's inflight
+ const size_t numErased = agent.S3WritesInFlight.erase(locator);
+ Y_ABORT_UNLESS(numErased == 1);
+
+ Self->TabletCounters->Cumulative()[NKikimrBlobDepot::COUNTER_S3_PUTS_OK] += 1;
+ Self->TabletCounters->Cumulative()[NKikimrBlobDepot::COUNTER_S3_PUTS_BYTES] += locator.Len;
+ }
Self->Data->UpdateKey(key, item, txc, this);
}
}
@@ -152,41 +200,6 @@ namespace NKikimr::NBlobDepot {
return true;
}
- bool LoadMissingKeys(TTransactionContext& txc) {
- NIceDb::TNiceDb db(txc.DB);
- if (Self->Data->IsLoaded()) {
- return true;
- }
- for (const auto& item : Request->Get()->Record.GetItems()) {
- auto key = TData::TKey::FromBinaryKey(item.GetKey(), Self->Config);
- if (!Self->Data->EnsureKeyLoaded(key, txc)) {
- return false;
- }
- }
- return true;
- }
-
- bool CheckKeyAgainstBarrier(const TData::TKey& key, TString *error) {
- const auto& v = key.AsVariant();
- if (const auto *id = std::get_if<TLogoBlobID>(&v)) {
- bool underSoft, underHard;
- Self->BarrierServer->GetBlobBarrierRelation(*id, &underSoft, &underHard);
- if (underHard) {
- *error = TStringBuilder() << "under hard barrier# " << Self->BarrierServer->ToStringBarrier(
- id->TabletID(), id->Channel(), true);
- return false;
- } else if (underSoft) {
- const TData::TValue *value = Self->Data->FindKey(key);
- if (!value || value->KeepState != NKikimrBlobDepot::EKeepState::Keep) {
- *error = TStringBuilder() << "under soft barrier# " << Self->BarrierServer->ToStringBarrier(
- id->TabletID(), id->Channel(), false);
- return false;
- }
- }
- }
- return true;
- }
-
void Complete(const TActorContext&) override {
TAgent& agent = Self->GetAgent(NodeId);
for (const TBlobSeqId blobSeqId : BlobSeqIds) {
@@ -210,7 +223,9 @@ namespace NKikimr::NBlobDepot {
// FIXME(alexvru): delete uncertain keys containing this BlobSeqId as they were never written
- for (const auto& item : ev->Get()->Record.GetItems()) {
+ const auto& record = ev->Get()->Record;
+
+ for (const auto& item : record.GetItems()) {
const auto blobSeqId = TBlobSeqId::FromProto(item);
if (blobSeqId.Generation == generation) {
Y_ABORT_UNLESS(blobSeqId.Channel < Channels.size());
@@ -227,6 +242,14 @@ namespace NKikimr::NBlobDepot {
}
}
}
+
+ for (const auto& item : record.GetS3Locators()) {
+ if (const auto& locator = TS3Locator::FromProto(item); locator.Generation == generation) {
+ const size_t numErased = agent.S3WritesInFlight.erase(locator);
+ Y_ABORT_UNLESS(numErased == 1);
+ S3Manager->AddTrashToCollect(locator);
+ }
+ }
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/s3.cpp b/ydb/core/blob_depot/s3.cpp
new file mode 100644
index 00000000000..72ea67b62ae
--- /dev/null
+++ b/ydb/core/blob_depot/s3.cpp
@@ -0,0 +1,53 @@
+#include "s3.h"
+#include <ydb/core/wrappers/s3_wrapper.h>
+
+namespace NKikimr::NBlobDepot {
+
+ using TS3Manager = TBlobDepot::TS3Manager;
+
+ TS3Manager::TS3Manager(TBlobDepot *self)
+ : Self(self)
+ {}
+
+ TS3Manager::~TS3Manager() = default;
+
+ void TS3Manager::Init(const NKikimrBlobDepot::TS3BackendSettings *settings) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDTS05, "Init", (Settings, settings));
+ if (settings) {
+ ExternalStorageConfig = NWrappers::IExternalStorageConfig::Construct(settings->GetSettings());
+ WrapperId = Self->Register(NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
+ BasePath = TStringBuilder() << settings->GetSettings().GetObjectKeyPattern() << '/' << Self->Config.GetName();
+ Bucket = settings->GetSettings().GetBucket();
+ SyncMode = settings->HasSyncMode();
+ AsyncMode = settings->HasAsyncMode();
+ RunScannerActor();
+ } else {
+ SyncMode = false;
+ AsyncMode = false;
+ }
+ }
+
+ void TS3Manager::TerminateAllActors() {
+ for (TActorId actorId : ActiveUploaders) {
+ Self->Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, Self->SelfId(), nullptr, 0));
+ }
+ for (TActorId actorId : ActiveDeleters) {
+ Self->Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, Self->SelfId(), nullptr, 0));
+ }
+ if (ScannerActorId) {
+ Self->Send(new IEventHandle(TEvents::TSystem::Poison, 0, ScannerActorId, Self->SelfId(), nullptr, 0));
+ }
+ }
+
+ void TS3Manager::Handle(TAutoPtr<IEventHandle> ev) {
+ STRICT_STFUNC_BODY(
+ fFunc(TEvPrivate::EvDeleteResult, HandleDeleter)
+ fFunc(TEvPrivate::EvScanFound, HandleScanner)
+ )
+ }
+
+ void TBlobDepot::InitS3Manager() {
+ S3Manager->Init(Config.HasS3BackendSettings() ? &Config.GetS3BackendSettings() : nullptr);
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/s3.h b/ydb/core/blob_depot/s3.h
new file mode 100644
index 00000000000..9a18fb732dc
--- /dev/null
+++ b/ydb/core/blob_depot/s3.h
@@ -0,0 +1,84 @@
+#pragma once
+
+#include "defs.h"
+#include "blob_depot_tablet.h"
+#include "data.h"
+
+#include <ydb/core/wrappers/abstract.h>
+
+namespace NKikimr::NBlobDepot {
+
+ class TBlobDepot::TS3Manager {
+ using TEvExternalStorage = NWrappers::TEvExternalStorage;
+
+ TBlobDepot* const Self;
+ NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig;
+ TActorId WrapperId;
+ TActorId UploaderId;
+ TString BasePath;
+ TString Bucket;
+
+ bool SyncMode = false;
+ bool AsyncMode = false;
+
+ ui64 NextKeyId = 1;
+
+ THashSet<TActorId> ActiveUploaders;
+
+ public:
+ TS3Manager(TBlobDepot *self);
+ ~TS3Manager();
+
+ void Init(const NKikimrBlobDepot::TS3BackendSettings *settings);
+ void TerminateAllActors();
+
+ void Handle(TAutoPtr<IEventHandle> ev);
+
+ void OnKeyWritten(const TData::TKey& key, const TValueChain& valueChain);
+
+ const TActorId& GetWrapperId() const { return WrapperId; }
+
+ void AddTrashToCollect(TS3Locator locator);
+
+ ui64 GetTotalS3TrashObjects() const { return TotalS3TrashObjects; }
+ ui64 GetTotalS3TrashSize() const { return TotalS3TrashSize; }
+
+ private: ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+ class TTxPrepareWriteS3;
+ friend class TBlobDepot;
+
+ TS3Locator AllocateS3Locator(ui32 len);
+
+ private: ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+ class TUploaderActor;
+ struct TEvUploadResult;
+
+ private: ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+ class TScannerActor;
+ class TTxProcessScannedKeys;
+ struct TEvScanFound;
+
+ TActorId ScannerActorId;
+
+ void RunScannerActor();
+ void HandleScanner(TAutoPtr<IEventHandle> ev);
+
+ private: ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+ class TDeleterActor;
+ class TTxDeleteTrashS3;
+ struct TEvDeleteResult;
+
+ static constexpr ui32 MaxDeletesInFlight = 3;
+ static constexpr size_t MaxObjectsToDeleteAtOnce = 10;
+
+ std::deque<TS3Locator> DeleteQueue; // items we are definitely going to delete (must be present in TrashS3)
+ THashSet<TActorId> ActiveDeleters;
+ ui32 NumDeleteTxInFlight = 0;
+ ui64 TotalS3TrashObjects = 0;
+ ui64 TotalS3TrashSize = 0;
+
+ void RunDeletersIfNeeded();
+ void HandleDeleter(TAutoPtr<IEventHandle> ev);
+ };
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/s3_delete.cpp b/ydb/core/blob_depot/s3_delete.cpp
new file mode 100644
index 00000000000..5825d20c715
--- /dev/null
+++ b/ydb/core/blob_depot/s3_delete.cpp
@@ -0,0 +1,225 @@
+#include "s3.h"
+
+namespace NKikimr::NBlobDepot {
+
+ using TS3Manager = TBlobDepot::TS3Manager;
+
+ struct TS3Manager::TEvDeleteResult : TEventLocal<TEvDeleteResult, TEvPrivate::EvDeleteResult> {
+ std::vector<TS3Locator> LocatorsOk;
+ std::vector<TS3Locator> LocatorsError;
+
+ TEvDeleteResult(std::vector<TS3Locator>&& locatorsOk, std::vector<TS3Locator>&& locatorsError)
+ : LocatorsOk(std::move(locatorsOk))
+ , LocatorsError(std::move(locatorsError))
+ {}
+ };
+
+ class TS3Manager::TDeleterActor : public TActor<TDeleterActor> {
+ TActorId ParentId;
+ THashMap<TString, TS3Locator> Locators;
+ TString LogId;
+
+ public:
+ TDeleterActor(TActorId parentId, THashMap<TString, TS3Locator> locators, TString logId)
+ : TActor(&TThis::StateFunc)
+ , ParentId(parentId)
+ , Locators(locators)
+ , LogId(std::move(logId))
+ {}
+
+ void Handle(TEvExternalStorage::TEvDeleteObjectResponse::TPtr ev) {
+ auto& msg = *ev->Get();
+ if (msg.IsSuccess()) {
+ Finish(std::nullopt);
+ } else if (const auto& error = msg.GetError(); error.GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) {
+ Finish(std::nullopt);
+ } else {
+ Finish(error.GetMessage().c_str());
+ }
+ }
+
+ void Handle(TEvExternalStorage::TEvDeleteObjectsResponse::TPtr ev) {
+ auto& msg = *ev->Get();
+
+ std::vector<TS3Locator> locatorsOk;
+ std::vector<TS3Locator> locatorsError;
+
+ if (msg.IsSuccess()) {
+ auto& result = msg.Result.GetResult();
+ for (const Aws::S3::Model::DeletedObject& deleted : result.GetDeleted()) {
+ if (deleted.KeyHasBeenSet()) {
+ if (const auto it = Locators.find(deleted.GetKey().c_str()); it != Locators.end()) {
+ locatorsOk.push_back(it->second);
+ Locators.erase(it);
+ } else {
+ STLOG(PRI_WARN, BLOB_DEPOT, BDTS09, "key not found", (Id, LogId),
+ (Key, deleted.KeyHasBeenSet() ? std::make_optional<TString>(deleted.GetKey().c_str()) : std::nullopt));
+ }
+ } else {
+ STLOG(PRI_WARN, BLOB_DEPOT, BDTS10, "key not set", (Id, LogId));
+ }
+ }
+ for (const Aws::S3::Model::Error& error : result.GetErrors()) {
+ if (error.KeyHasBeenSet() && error.GetCode() == "NoSuchKey") { // this key has already been deleted
+ if (const auto it = Locators.find(error.GetKey().c_str()); it != Locators.end()) {
+ locatorsOk.push_back(it->second);
+ Locators.erase(it);
+ }
+ } else {
+ STLOG(PRI_WARN, BLOB_DEPOT, BDTS11, "failed to delete object from S3", (Id, LogId),
+ (Key, error.KeyHasBeenSet() ? std::make_optional<TString>(error.GetKey().c_str()) : std::nullopt),
+ (Error, error.GetMessage().c_str()));
+ }
+ }
+ } else {
+ STLOG(PRI_WARN, BLOB_DEPOT, BDTS12, "failed to delete object(s) from S3", (Id, LogId),
+ (Error, msg.GetError().GetMessage().c_str()));
+ }
+
+ for (const auto& [key, locator] : Locators) {
+ locatorsError.push_back(locator);
+ STLOG(PRI_WARN, BLOB_DEPOT, BDTS08, "failed to delete object from S3", (Id, LogId), (Locator, locator));
+ }
+
+ Send(ParentId, new TEvDeleteResult(std::move(locatorsOk), std::move(locatorsError)));
+ PassAway();
+ }
+
+ void HandleUndelivered() {
+ Finish("event undelivered");
+ }
+
+ void Finish(std::optional<TString> error) {
+ if (error) {
+ STLOG(PRI_WARN, BLOB_DEPOT, BDTS03, "failed to delete object(s) from S3", (Id, LogId), (Locators, Locators),
+ (Error, error));
+ }
+ std::vector<TS3Locator> locatorsOk;
+ std::vector<TS3Locator> locatorsError;
+ auto *target = error ? &locatorsError : &locatorsOk;
+ for (const auto& [key, locator] : Locators) {
+ target->push_back(locator);
+ }
+ Send(ParentId, new TEvDeleteResult(std::move(locatorsOk), std::move(locatorsError)));
+ PassAway();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvExternalStorage::TEvDeleteObjectResponse, Handle)
+ hFunc(TEvExternalStorage::TEvDeleteObjectsResponse, Handle)
+ cFunc(TEvents::TSystem::Undelivered, HandleUndelivered)
+ cFunc(TEvents::TSystem::Poison, PassAway)
+ )
+ };
+
+ class TS3Manager::TTxDeleteTrashS3 : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ std::vector<TS3Locator> Locators;
+
+ public:
+ TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_DELETE_TRASH_S3; }
+
+ TTxDeleteTrashS3(TBlobDepot *self, std::vector<TS3Locator>&& locators)
+ : TTransactionBase(self)
+ , Locators(std::move(locators))
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ for (const TS3Locator& locator : Locators) {
+ NIceDb::TNiceDb(txc.DB).Table<Schema::TrashS3>().Key(locator.Generation, locator.KeyId).Delete();
+ }
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ STLOG(PRI_INFO, BLOB_DEPOT, BDTS04, "TTxDeleteTrashS3 complete", (Id, Self->GetLogId()), (Locators, Locators));
+
+ size_t len = 0;
+ for (const TS3Locator& locator : Locators) {
+ len += locator.Len;
+ }
+
+ Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_OBJECTS] =
+ Self->S3Manager->TotalS3TrashObjects -= Locators.size();
+ Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_SIZE] =
+ Self->S3Manager->TotalS3TrashSize -= len;
+
+ --Self->S3Manager->NumDeleteTxInFlight;
+
+ Self->S3Manager->RunDeletersIfNeeded();
+ }
+ };
+
+ void TS3Manager::AddTrashToCollect(TS3Locator locator) {
+ STLOG(PRI_INFO, BLOB_DEPOT, BDTS06, "AddTrashToCollect", (Id, Self->GetLogId()), (Locator, locator));
+ Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_OBJECTS] = ++TotalS3TrashObjects;
+ Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_SIZE] = TotalS3TrashSize += locator.Len;
+ DeleteQueue.push_back(locator);
+ RunDeletersIfNeeded();
+ }
+
+ void TS3Manager::RunDeletersIfNeeded() {
+ while (!DeleteQueue.empty() && NumDeleteTxInFlight + ActiveDeleters.size() < MaxDeletesInFlight) {
+ THashMap<TString, TS3Locator> locators;
+
+ while (!DeleteQueue.empty() && locators.size() < MaxObjectsToDeleteAtOnce) {
+ const TS3Locator& locator = DeleteQueue.front();
+ locators.emplace(locator.MakeObjectName(BasePath), locator);
+ DeleteQueue.pop_front();
+ }
+
+ const TActorId actorId = Self->Register(new TDeleterActor(Self->SelfId(), locators, Self->GetLogId()));
+ ActiveDeleters.insert(actorId);
+
+ if (locators.size() == 1) {
+ TActivationContext::Send(new IEventHandle(WrapperId, actorId,
+ new TEvExternalStorage::TEvDeleteObjectRequest(
+ Aws::S3::Model::DeleteObjectRequest()
+ .WithBucket(Bucket)
+ .WithKey(locators.begin()->first)
+ ),
+ IEventHandle::FlagTrackDelivery
+ ));
+ } else {
+ auto del = Aws::S3::Model::Delete();
+ for (const auto& [key, locator] : locators) {
+ del.AddObjects(Aws::S3::Model::ObjectIdentifier().WithKey(key));
+ }
+
+ TActivationContext::Send(new IEventHandle(WrapperId, actorId,
+ new TEvExternalStorage::TEvDeleteObjectsRequest(Aws::S3::Model::DeleteObjectsRequest()
+ .WithBucket(Bucket).WithDelete(std::move(del))), IEventHandle::FlagTrackDelivery));
+ }
+ }
+ }
+
+ void TS3Manager::HandleDeleter(TAutoPtr<IEventHandle> ev) {
+ STRICT_STFUNC_BODY(
+ hFunc(TEvDeleteResult, [&](TEvDeleteResult::TPtr ev) {
+ const size_t numErased = ActiveDeleters.erase(ev->Sender);
+ Y_ABORT_UNLESS(numErased == 1);
+
+ auto& msg = *ev->Get();
+
+ Self->TabletCounters->Cumulative()[NKikimrBlobDepot::COUNTER_S3_DELETES_OK] += msg.LocatorsOk.size();
+ size_t len = 0;
+ for (const TS3Locator& locator : msg.LocatorsOk) {
+ len += locator.Len;
+ }
+ Self->TabletCounters->Cumulative()[NKikimrBlobDepot::COUNTER_S3_DELETES_BYTES] += len;
+
+ Self->TabletCounters->Cumulative()[NKikimrBlobDepot::COUNTER_S3_DELETES_ERROR] += msg.LocatorsError.size();
+
+ if (!msg.LocatorsOk.empty()) {
+ Self->Execute(std::make_unique<TTxDeleteTrashS3>(Self, std::move(msg.LocatorsOk)));
+ ++NumDeleteTxInFlight;
+ }
+
+ if (!msg.LocatorsError.empty()) {
+ DeleteQueue.insert(DeleteQueue.end(), msg.LocatorsError.begin(), msg.LocatorsError.end());
+ RunDeletersIfNeeded();
+ }
+ })
+ )
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/s3_scan.cpp b/ydb/core/blob_depot/s3_scan.cpp
new file mode 100644
index 00000000000..f4dd43f190a
--- /dev/null
+++ b/ydb/core/blob_depot/s3_scan.cpp
@@ -0,0 +1,216 @@
+#include "s3.h"
+
+namespace NKikimr::NBlobDepot {
+
+ using TS3Manager = TBlobDepot::TS3Manager;
+
+ struct TS3Manager::TEvScanFound : TEventLocal<TEvScanFound, TEvPrivate::EvScanFound> {
+ std::vector<std::tuple<TString, ui64>> KeysWithoutPrefix;
+ bool IsFinal;
+ std::optional<TString> Error;
+
+ TEvScanFound(std::vector<std::tuple<TString, ui64>>&& keysWithoutPrefix, bool isFinal, std::optional<TString>&& error)
+ : KeysWithoutPrefix(std::move(keysWithoutPrefix))
+ , IsFinal(isFinal)
+ , Error(std::move(error))
+ {}
+ };
+
+ class TS3Manager::TScannerActor : public TActorBootstrapped<TScannerActor> {
+ const TActorId ParentId;
+ const TActorId WrapperId;
+ TString Prefix;
+ const TString Bucket;
+ const TString LogId;
+ std::optional<TString> Marker;
+
+ public:
+ TScannerActor(TActorId parentId, TActorId wrapperId, TString prefix, TString bucket, TString logId)
+ : ParentId(parentId)
+ , WrapperId(wrapperId)
+ , Prefix(std::move(prefix))
+ , Bucket(std::move(bucket))
+ , LogId(std::move(logId))
+ {
+ Prefix += '/';
+ }
+
+ void Bootstrap() {
+ IssueNextRequest();
+ Become(&TThis::StateFunc);
+ }
+
+ void IssueNextRequest() {
+ auto request = Aws::S3::Model::ListObjectsRequest()
+ .WithBucket(Bucket)
+ .WithPrefix(Prefix)
+ ;
+ if (Marker) {
+ request.SetMarker(*Marker);
+ }
+ request.SetMaxKeys(100);
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDTS18, "TScannerActor::IssueNextRequest", (Id, LogId), (Prefix, Prefix),
+ (Marker, Marker));
+
+ Send(WrapperId, new TEvExternalStorage::TEvListObjectsRequest(request), IEventHandle::FlagTrackDelivery);
+ }
+
+ void Handle(TEvExternalStorage::TEvListObjectsResponse::TPtr ev) {
+ auto& msg = *ev->Get();
+ if (!msg.IsSuccess()) {
+ FinishWithError(msg.GetError().GetMessage().c_str());
+ } else {
+ const auto& result = msg.Result.GetResult();
+ TString lastKey;
+ std::vector<std::tuple<TString, ui64>> keysWithoutPrefix;
+
+ for (const auto& item : result.GetContents()) {
+ if (!item.KeyHasBeenSet()) {
+ return FinishWithError("invalid response: no key set in listing");
+ } else if (!item.SizeHasBeenSet()) {
+ return FinishWithError(TStringBuilder() << "invalid response: no size for key " << item.GetKey());
+ }
+ lastKey = item.GetKey();
+ if (!lastKey.StartsWith(Prefix)) {
+ return FinishWithError("returned key does not start with specified prefix");
+ }
+ keysWithoutPrefix.emplace_back(lastKey.substr(Prefix.length()), item.GetSize());
+ }
+
+ const bool isFinal = !result.GetIsTruncated();
+
+ Send(ParentId, new TEvScanFound(std::move(keysWithoutPrefix), isFinal, std::nullopt));
+
+ if (isFinal) {
+ PassAway();
+ } else {
+ Marker.emplace(std::move(lastKey));
+ }
+ }
+ }
+
+ void HandleUndelivered() {
+ FinishWithError("event undelivered");
+ }
+
+ void FinishWithError(TString error) {
+ Send(ParentId, new TEvScanFound({}, true, std::move(error)));
+ PassAway();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvExternalStorage::TEvListObjectsResponse, Handle)
+ cFunc(TEvents::TSystem::Undelivered, HandleUndelivered)
+ cFunc(TEvPrivate::EvScanContinue, IssueNextRequest)
+ cFunc(TEvents::TSystem::Poison, PassAway)
+ )
+ };
+
+ class TS3Manager::TTxProcessScannedKeys : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ THashSet<TS3Locator> UnprocessedLocators;
+ THashSet<TS3Locator> LocatorsToDelete;
+
+ public:
+ TTxProcessScannedKeys(TBlobDepot *self, THashSet<TS3Locator>&& locators)
+ : TTransactionBase(self)
+ , UnprocessedLocators(std::move(locators))
+ {}
+
+ TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_PROCESS_SCANNED_KEYS; }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ NIceDb::TNiceDb db(txc.DB);
+
+ for (auto it = UnprocessedLocators.begin(); it != UnprocessedLocators.end(); ) {
+ auto row = db.Table<Schema::TrashS3>().Key(it->Generation, it->KeyId).Select();
+ if (!row.IsReady()) {
+ return false;
+ } else if (!row.IsValid()) {
+ const bool useful = Self->Data->IsUseful(*it);
+ if (useful) {
+ STLOG(PRI_CRIT, BLOB_DEPOT, BDTS13, "trying to delete useful S3 locator", (Id, Self->GetLogId()),
+ (Locator, *it));
+ Y_DEBUG_ABORT("trying to delete useful S3 locator");
+ } else {
+ LocatorsToDelete.insert(*it);
+ }
+ }
+ UnprocessedLocators.erase(it++);
+ }
+
+ Y_ABORT_UNLESS(UnprocessedLocators.empty());
+
+ for (const TS3Locator& locator : LocatorsToDelete) {
+ db.Table<Schema::TrashS3>().Key(locator.Generation, locator.KeyId).Update<Schema::TrashS3::Len>(locator.Len);
+ }
+
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ for (const auto& locator : LocatorsToDelete) {
+ Self->S3Manager->AddTrashToCollect(locator);
+ }
+ if (const auto& actorId = Self->S3Manager->ScannerActorId) {
+ TActivationContext::Send(new IEventHandle(TEvPrivate::EvScanContinue, 0, actorId, Self->SelfId(), nullptr, 0));
+ }
+ }
+ };
+
+ void TS3Manager::RunScannerActor() {
+ Y_ABORT_UNLESS(!ScannerActorId);
+ ScannerActorId = Self->Register(new TScannerActor(Self->SelfId(), WrapperId, BasePath, Bucket, Self->GetLogId()));
+ }
+
+ void TS3Manager::HandleScanner(TAutoPtr<IEventHandle> ev) {
+ STRICT_STFUNC_BODY(
+ hFunc(TEvScanFound, [&](TEvScanFound::TPtr ev) {
+ auto& msg = *ev->Get();
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDTS17, "TEvScanFound received", (Id, Self->GetLogId()),
+ (IsFinal, msg.IsFinal), (Error, msg.Error), (KeysWithoutPrefix.size, msg.KeysWithoutPrefix.size()));
+
+ Y_ABORT_UNLESS(ScannerActorId);
+ Y_ABORT_UNLESS(ev->Sender == ScannerActorId);
+
+ if (msg.Error) {
+ STLOG(PRI_WARN, BLOB_DEPOT, BDTS14, "scanner error", (Id, Self->GetLogId()), (Error, msg.Error));
+ // TODO(alexvru): restart scanner in some time
+ }
+
+ THashSet<TS3Locator> trash;
+
+ const ui32 generation = Self->Executor()->Generation();
+
+ for (const auto& [key, len] : msg.KeysWithoutPrefix) {
+ TString error;
+ if (const auto& locator = TS3Locator::FromObjectName(key, len, &error)) {
+ const bool useful = Self->Data->IsUseful(*locator);
+ const bool allow = locator->Generation < generation;
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDTS15, "TEvScanFound: found key", (Id, Self->GetLogId()),
+ (Locator, *locator), (Useful, useful), (Allow, allow), (Error, error));
+ if (!useful && allow) {
+ trash.insert(*locator);
+ }
+ } else {
+ STLOG(PRI_WARN, BLOB_DEPOT, BDTS16, "TEvScanFound: incorrect key name", (Id, Self->GetLogId()),
+ (Key, key), (Len, len));
+ }
+ }
+
+ if (msg.IsFinal) {
+ ScannerActorId = {};
+ }
+
+ if (!trash.empty()) {
+ Self->Execute(std::make_unique<TTxProcessScannedKeys>(Self, std::move(trash)));
+ } else if (!msg.IsFinal) {
+ TActivationContext::Send(new IEventHandle(TEvPrivate::EvScanContinue, 0, ev->Sender, ev->Recipient,
+ nullptr, 0));
+ }
+ })
+ )
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/s3_upload.cpp b/ydb/core/blob_depot/s3_upload.cpp
new file mode 100644
index 00000000000..9877744cf68
--- /dev/null
+++ b/ydb/core/blob_depot/s3_upload.cpp
@@ -0,0 +1,95 @@
+#include "s3.h"
+
+namespace NKikimr::NBlobDepot {
+
+ using TS3Manager = TBlobDepot::TS3Manager;
+
+ struct TS3Manager::TEvUploadResult : TEventLocal<TEvUploadResult, TEvPrivate::EvUploadResult> {
+ };
+
+ class TS3Manager::TUploaderActor : public TActorBootstrapped<TUploaderActor> {
+ const TActorId WrapperId;
+ const TString BasePath;
+ const TString Bucket;
+ const TIntrusivePtr<TTabletStorageInfo> Info;
+ const TData::TKey Key;
+ const TValueChain ValueChain;
+ ui32 RepliesPending = 0;
+ TString Buffer;
+
+ public:
+ TUploaderActor(TActorId wrapperId, TString basePath, TString bucket, TIntrusivePtr<TTabletStorageInfo> info,
+ TData::TKey key, TValueChain valueChain)
+ : WrapperId(wrapperId)
+ , BasePath(std::move(basePath))
+ , Bucket(std::move(bucket))
+ , Info(std::move(info))
+ , Key(std::move(key))
+ , ValueChain(std::move(valueChain))
+ {}
+
+ void Bootstrap() {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDTS00, "TUploaderActor::Bootstrap", (Key, Key), (ValueChain, ValueChain));
+ size_t targetOffset = 0;
+ EnumerateBlobsForValueChain(ValueChain, Info->TabletID, TOverloaded{
+ [&](TLogoBlobID id, ui32 shift, ui32 size) {
+ const ui32 groupId = Info->GroupFor(id.Channel(), id.Generation());
+ SendToBSProxy(SelfId(), groupId, new TEvBlobStorage::TEvGet(id, shift, size, TInstant::Max(),
+ NKikimrBlobStorage::EGetHandleClass::FastRead), targetOffset);
+ ++RepliesPending;
+ targetOffset += size;
+ },
+ [&](TS3Locator) {
+ // already stored in S3, nothing to do
+ }
+ });
+ Buffer = TString::Uninitialized(targetOffset);
+ if (!RepliesPending) {
+ // don't have to read anything?
+ }
+ Become(&TThis::StateFunc);
+ }
+
+ void Handle(TEvBlobStorage::TEvGetResult::TPtr ev) {
+ auto *msg = ev->Get();
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDTS01, "TUploaderActor::Handle(TEvGetResult)", (Msg, *msg));
+ if (msg->Status == NKikimrProto::OK && msg->ResponseSz == 1 && msg->Responses->Status == NKikimrProto::OK) {
+ TRope& rope = msg->Responses->Buffer;
+ char *ptr = Buffer.Detach() + ev->Cookie;
+ for (auto it = rope.Begin(); it.Valid(); it.AdvanceToNextContiguousBlock()) {
+ memcpy(ptr, it.ContiguousData(), it.ContiguousSize());
+ ptr += it.ContiguousSize();
+ }
+ if (!--RepliesPending) {
+ auto request = Aws::S3::Model::PutObjectRequest()
+ .WithBucket(Bucket)
+ .WithKey(TStringBuilder() << BasePath << '/' << Key.MakeTextualKey());
+ Send(WrapperId, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));
+ }
+ } else {
+ }
+ }
+
+ void Handle(TEvExternalStorage::TEvPutObjectResponse::TPtr ev) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDTS02, "TUploaderActor::Handle(TEvPutObjectResponse)", (Result, ev->Get()->Result));
+ if (auto& result = ev->Get()->Result; result.IsSuccess()) {
+ } else {
+ }
+ PassAway();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvBlobStorage::TEvGetResult, Handle)
+ hFunc(TEvExternalStorage::TEvPutObjectResponse, Handle)
+ cFunc(TEvents::TSystem::Poison, PassAway)
+ )
+ };
+
+ void TS3Manager::OnKeyWritten(const TData::TKey& key, const TValueChain& valueChain) {
+ if (AsyncMode) {
+ ActiveUploaders.insert(Self->Register(new TUploaderActor(WrapperId, BasePath, Bucket, Self->Info(), key,
+ valueChain)));
+ }
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/s3_write.cpp b/ydb/core/blob_depot/s3_write.cpp
new file mode 100644
index 00000000000..0914aa21e7c
--- /dev/null
+++ b/ydb/core/blob_depot/s3_write.cpp
@@ -0,0 +1,118 @@
+#include "s3.h"
+#include "blocks.h"
+
+namespace NKikimr::NBlobDepot {
+
+ using TS3Manager = TBlobDepot::TS3Manager;
+
+ class TS3Manager::TTxPrepareWriteS3 : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ const ui32 NodeId;
+ const ui64 AgentInstanceId;
+ std::unique_ptr<TEvBlobDepot::TEvPrepareWriteS3::THandle> Request;
+ std::unique_ptr<IEventHandle> Response;
+
+ public:
+ TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_PREPARE_WRITE_S3; }
+
+ TTxPrepareWriteS3(TBlobDepot *self, TAgent& agent, std::unique_ptr<TEvBlobDepot::TEvPrepareWriteS3::THandle> request)
+ : TTransactionBase(self)
+ , NodeId(agent.Connection->NodeId)
+ , AgentInstanceId(*agent.AgentInstanceId)
+ , Request(std::move(request))
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ TAgent& agent = Self->GetAgent(NodeId);
+ if (!agent.Connection || agent.AgentInstanceId != AgentInstanceId) {
+ // agent has been disconnected while transaction was in queue -- do nothing
+ return true;
+ }
+
+ if (!Self->Data->LoadMissingKeys(Request->Get()->Record, txc)) {
+ // we haven't loaded all of the required keys
+ return false;
+ }
+
+ NIceDb::TNiceDb db(txc.DB);
+
+ NKikimrBlobDepot::TEvPrepareWriteS3Result *responseRecord;
+ std::tie(Response, responseRecord) = TEvBlobDepot::MakeResponseFor(*Request);
+
+ for (const auto& record = Request->Get()->Record; const auto& item : record.GetItems()) {
+ auto *responseItem = responseRecord->AddItems();
+
+ TString error;
+ if (NKikimrProto::EReplyStatus status = CheckItem(item, error); status != NKikimrProto::OK) {
+ // basic checks have failed (blocked, item was deleted, or something else)
+ responseItem->SetStatus(status);
+ responseItem->SetErrorReason(error);
+ } else {
+ responseItem->SetStatus(NKikimrProto::OK);
+
+ const TS3Locator locator = Self->S3Manager->AllocateS3Locator(item.GetLen());
+ locator.ToProto(responseItem->MutableS3Locator());
+
+ // we put it here until operation is completed; if tablet restarts and operation fails, then this
+ // key will be deleted
+ db.Table<Schema::TrashS3>().Key(locator.Generation, locator.KeyId).Update<Schema::TrashS3::Len>(locator.Len);
+
+ const bool inserted = agent.S3WritesInFlight.insert(locator).second;
+ Y_ABORT_UNLESS(inserted);
+ }
+ }
+
+ return true;
+ }
+
+ NKikimrProto::EReplyStatus CheckItem(const NKikimrBlobDepot::TEvPrepareWriteS3::TItem& item, TString& error) {
+ auto key = TData::TKey::FromBinaryKey(item.GetKey(), Self->Config);
+
+ bool blocksPass = std::visit(TOverloaded{
+ [](TStringBuf) { return true; },
+ [&](TLogoBlobID blobId) { return Self->BlocksManager->CheckBlock(blobId.TabletID(), blobId.Generation()); }
+ }, key.AsVariant());
+
+ for (const auto& extra : item.GetExtraBlockChecks()) {
+ if (!blocksPass) {
+ break;
+ }
+ blocksPass = Self->BlocksManager->CheckBlock(extra.GetTabletId(), extra.GetGeneration());
+ }
+
+ if (!blocksPass) {
+ error = "blocked";
+ return NKikimrProto::BLOCKED;
+ }
+
+ if (auto e = Self->Data->CheckKeyAgainstBarrier(key)) {
+ error = TStringBuilder() << "BlobId# " << key.ToString() << " is being put beyond the barrier: " << *e;
+ return NKikimrProto::ERROR;
+ }
+
+ return NKikimrProto::OK;
+ }
+
+ void Complete(const TActorContext&) override {
+ TActivationContext::Send(Response.release());
+ }
+ };
+
+ TS3Locator TS3Manager::AllocateS3Locator(ui32 len) {
+ return {
+ .Len = len,
+ .Generation = Self->Executor()->Generation(),
+ .KeyId = NextKeyId++,
+ };
+ }
+
+ void TBlobDepot::Handle(TEvBlobDepot::TEvPrepareWriteS3::TPtr ev) {
+ auto& agent = GetAgent(ev->Recipient);
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDTS07, "TEvPrepareWriteS3", (Id, GetLogId()), (AgentId, agent.Connection->NodeId),
+ (Msg, ev->Get()->Record));
+
+ Execute(std::make_unique<TS3Manager::TTxPrepareWriteS3>(this, agent,
+ std::unique_ptr<TEvBlobDepot::TEvPrepareWriteS3::THandle>(ev.Release())));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/schema.h b/ydb/core/blob_depot/schema.h
index c19fe1a636e..2e655bc1d73 100644
--- a/ydb/core/blob_depot/schema.h
+++ b/ydb/core/blob_depot/schema.h
@@ -106,13 +106,25 @@ namespace NKikimr::NBlobDepot {
using TColumns = TableColumns<Channel, GroupId, IssuedGenStep, ConfirmedGenStep>;
};
+ struct TrashS3 : Table<7> {
+ struct Generation : Column<1, NScheme::NTypeIds::Uint32> {};
+ struct KeyId : Column<2, NScheme::NTypeIds::Uint64> {};
+ struct Len : Column<3, NScheme::NTypeIds::Uint32> {};
+
+ using TKey = TableKey<Generation, KeyId>;
+ using TColumns = TableColumns<Generation, KeyId, Len>;
+
+ using Precharge = NoAutoPrecharge;
+ };
+
using TTables = SchemaTables<
Config,
Blocks,
Barriers,
Data,
Trash,
- GC
+ GC,
+ TrashS3
>;
using TSettings = SchemaSettings<
diff --git a/ydb/core/blob_depot/testing.cpp b/ydb/core/blob_depot/testing.cpp
index e94b873abb5..79758a7b7a2 100644
--- a/ydb/core/blob_depot/testing.cpp
+++ b/ydb/core/blob_depot/testing.cpp
@@ -41,13 +41,17 @@ namespace NKikimr::NBlobDepot {
const TData::TValue *value = Data->FindKey(key);
Y_ABORT_UNLESS(value); // key must exist
ui32 numDataBytes = 0;
- EnumerateBlobsForValueChain(value->ValueChain, TabletID(), [&](TLogoBlobID id, ui32, ui32 size) {
- const ui32 groupId = Info()->GroupFor(id.Channel(), id.Generation());
- const auto state = overseer.GetBlobState(groupId, id);
- Y_VERIFY_S(state == NTesting::EBlobState::CERTAINLY_WRITTEN,
- "UserId# " << userId.ToString() << " UserState# " << (int)userState
- << " Id# " << id.ToString() << " State# " << (int)state);
- numDataBytes += size;
+ EnumerateBlobsForValueChain(value->ValueChain, TabletID(), TOverloaded {
+ [&](TLogoBlobID id, ui32, ui32 size) {
+ const ui32 groupId = Info()->GroupFor(id.Channel(), id.Generation());
+ const auto state = overseer.GetBlobState(groupId, id);
+ Y_VERIFY_S(state == NTesting::EBlobState::CERTAINLY_WRITTEN,
+ "UserId# " << userId.ToString() << " UserState# " << (int)userState
+ << " Id# " << id.ToString() << " State# " << (int)state);
+ numDataBytes += size;
+ },
+ [&](TS3Locator) {
+ }
});
Y_ABORT_UNLESS(numDataBytes == userId.BlobSize());
break;
diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h
index bbad5240829..ee53ec166e8 100644
--- a/ydb/core/blob_depot/types.h
+++ b/ydb/core/blob_depot/types.h
@@ -123,6 +123,82 @@ namespace NKikimr::NBlobDepot {
}
};
+ struct TS3Locator {
+ ui32 Len = 0;
+ ui32 Generation = 0;
+ ui64 KeyId = 0;
+
+ static TS3Locator FromProto(const NKikimrBlobDepot::TS3Locator& locator) {
+ return {
+ .Len = locator.GetLen(),
+ .Generation = locator.GetGeneration(),
+ .KeyId = locator.GetKeyId(),
+ };
+ }
+
+ void ToProto(NKikimrBlobDepot::TS3Locator *locator) const {
+ locator->SetLen(Len);
+ locator->SetGeneration(Generation);
+ locator->SetKeyId(KeyId);
+ }
+
+ void Output(IOutputStream& s) const {
+ s << '{' << Len << '@' << Generation << '.' << KeyId << '}';
+ }
+
+ TString ToString() const {
+ TStringStream s;
+ Output(s);
+ return s.Str();
+ }
+
+ TString MakeObjectName(const TString& basePath) const {
+ const size_t hash = THash()(*this);
+ const size_t a = hash % 36;
+ const size_t b = hash / 36 % 36;
+ static const char vec[] = "0123456789abcdefghijklmnopqrstuvwxyz";
+ return TStringBuilder() << basePath
+ << '/' << Generation
+ << '/' << vec[a]
+ << '/' << vec[b]
+ << '/' << KeyId;
+ }
+
+ static std::optional<TS3Locator> FromObjectName(const TString& name, ui64 len, TString *error) {
+ try {
+ if (len > Max<ui32>()) {
+ *error = "value is too long";
+ return std::nullopt;
+ }
+ ui32 generation;
+ ui64 keyId;
+ TString a, b;
+ Split(name, '/', generation, a, b, keyId);
+ TS3Locator res{
+ .Len = static_cast<ui32>(len),
+ .Generation = generation,
+ .KeyId = keyId,
+ };
+ if (res.MakeObjectName(TString()) != '/' + name) {
+ *error = "object name does not match";
+ return std::nullopt;
+ }
+ return res;
+ } catch (const std::exception& ex) {
+ *error = ex.what();
+ return std::nullopt;
+ }
+ }
+
+ struct THash {
+ size_t operator ()(const TS3Locator& x) const {
+ return MultiHash(x.Len, x.Generation, x.KeyId);
+ }
+ };
+
+ friend std::strong_ordering operator <=>(const TS3Locator&, const TS3Locator&) = default;
+ };
+
class TGivenIdRange {
static constexpr size_t BitsPerChunk = 256;
using TChunk = TBitMap<BitsPerChunk, ui64>;
@@ -165,16 +241,21 @@ namespace NKikimr::NBlobDepot {
template<typename TCallback>
void EnumerateBlobsForValueChain(const TValueChain& valueChain, ui64 tabletId, TCallback&& callback) {
for (const auto& item : valueChain) {
- const auto& locator = item.GetLocator();
- const auto& blobSeqId = TBlobSeqId::FromProto(locator.GetBlobSeqId());
- if (locator.GetFooterLen() == 0) {
- callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen()), 0, locator.GetTotalDataLen());
- } else if (locator.GetTotalDataLen() + locator.GetFooterLen() > MaxBlobSize) {
- callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen()), 0, locator.GetTotalDataLen());
- callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_FOOTER_BLOB, 0, locator.GetFooterLen()), 0, 0);
- } else {
- callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_COMPOSITE_BLOB, 0, locator.GetTotalDataLen() +
- locator.GetFooterLen()), 0, locator.GetTotalDataLen());
+ if (item.HasBlobLocator()) {
+ const auto& locator = item.GetBlobLocator();
+ const auto& blobSeqId = TBlobSeqId::FromProto(locator.GetBlobSeqId());
+ if (locator.GetFooterLen() == 0) {
+ callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen()), 0, locator.GetTotalDataLen());
+ } else if (locator.GetTotalDataLen() + locator.GetFooterLen() > MaxBlobSize) {
+ callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen()), 0, locator.GetTotalDataLen());
+ callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_FOOTER_BLOB, 0, locator.GetFooterLen()), 0, 0);
+ } else {
+ callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_COMPOSITE_BLOB, 0, locator.GetTotalDataLen() +
+ locator.GetFooterLen()), 0, locator.GetTotalDataLen());
+ }
+ }
+ if (item.HasS3Locator()) {
+ callback(TS3Locator::FromProto(item.GetS3Locator()));
}
}
}
@@ -216,3 +297,6 @@ namespace NKikimr::NBlobDepot {
} while (false)
} // NKikimr::NBlobDepot
+
+template<> struct THash<NKikimr::NBlobDepot::TS3Locator> : NKikimr::NBlobDepot::TS3Locator::THash {};
+template<> struct std::hash<NKikimr::NBlobDepot::TS3Locator> : THash<NKikimr::NBlobDepot::TS3Locator> {};
diff --git a/ydb/core/blob_depot/ya.make b/ydb/core/blob_depot/ya.make
index 34642663934..42b8adf4d67 100644
--- a/ydb/core/blob_depot/ya.make
+++ b/ydb/core/blob_depot/ya.make
@@ -33,6 +33,12 @@ LIBRARY()
group_metrics_exchange.cpp
mon_main.cpp
mon_main.h
+ s3.cpp
+ s3_delete.cpp
+ s3_scan.cpp
+ s3_upload.cpp
+ s3_write.cpp
+ s3.h
space_monitor.cpp
space_monitor.h
testing.cpp
@@ -49,6 +55,7 @@ LIBRARY()
ydb/core/blobstorage/vdisk/common
ydb/core/tablet_flat
ydb/core/protos
+ ydb/core/wrappers
)
GENERATE_ENUM_SERIALIZATION(schema.h)
diff --git a/ydb/core/mind/bscontroller/virtual_group.cpp b/ydb/core/mind/bscontroller/virtual_group.cpp
index c49349750e5..c6a6adc2683 100644
--- a/ydb/core/mind/bscontroller/virtual_group.cpp
+++ b/ydb/core/mind/bscontroller/virtual_group.cpp
@@ -95,6 +95,10 @@ namespace NKikimr::NBsController {
NKikimrBlobDepot::TBlobDepotConfig config;
config.SetVirtualGroupId(group->ID.GetRawId());
config.MutableChannelProfiles()->CopyFrom(cmd.GetChannelProfiles());
+ if (cmd.HasS3BackendSettings()) {
+ config.MutableS3BackendSettings()->CopyFrom(cmd.GetS3BackendSettings());
+ }
+ config.SetName(cmd.GetName());
const bool success = config.SerializeToString(&group->BlobDepotConfig.ConstructInPlace());
Y_ABORT_UNLESS(success);
diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto
index c7fe27a8ad0..8f9c12cb033 100644
--- a/ydb/core/protos/blob_depot.proto
+++ b/ydb/core/protos/blob_depot.proto
@@ -25,10 +25,17 @@ message TBlobLocator {
optional uint32 FooterLen = 5;
}
+message TS3Locator {
+ optional uint32 Len = 1; // length of contained data
+ optional uint32 Generation = 2; // generation of tablet issued this object into S3
+ optional uint64 KeyId = 3; // key id unique within the generation
+}
+
message TValueChain {
- optional TBlobLocator Locator = 1;
+ optional TBlobLocator BlobLocator = 1;
optional uint64 SubrangeBegin = 2;
optional uint64 SubrangeEnd = 3;
+ optional TS3Locator S3Locator = 4; // if filled, then this blob is stored in S3
}
message TValue {
@@ -54,6 +61,7 @@ message TResolvedValueChain {
optional NKikimrProto.TLogoBlobID BlobId = 2;
optional uint32 SubrangeBegin = 3;
optional uint32 SubrangeEnd = 4;
+ optional TS3Locator S3Locator = 5;
}
@@ -87,6 +95,8 @@ message TEvRegisterAgentResult {
optional uint32 DecommitGroupId = 3;
optional NKikimrBlobStorage.TPDiskSpaceColor.E SpaceColor = 4;
optional float ApproximateFreeSpaceShare = 5;
+ optional NKikimrBlobDepot.TS3BackendSettings S3BackendSettings = 6;
+ optional string Name = 7;
}
message TEvAllocateIds {
@@ -161,12 +171,19 @@ message TEvCollectGarbageResult {
}
message TEvCommitBlobSeq {
+ message TExtraBlockCheck {
+ optional uint64 TabletId = 1;
+ optional uint32 Generation = 2;
+ }
+
message TItem {
optional TBlobLocator BlobLocator = 1; // GroupId and Generation are for validation purposes
optional bytes Key = 2;
optional bytes Meta = 3;
optional bool UncertainWrite = 4;
optional bool CommitNotify = 5;
+ optional TS3Locator S3Locator = 6; // if blob has been written to S3
+ repeated TExtraBlockCheck ExtraBlockChecks = 7;
}
repeated TItem Items = 1;
@@ -183,6 +200,7 @@ message TEvCommitBlobSeqResult {
message TEvDiscardSpoiledBlobSeq {
repeated TBlobSeqId Items = 1;
+ repeated TS3Locator S3Locators = 2;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -233,3 +251,28 @@ message TEvPushMetrics {
optional uint64 BytesRead = 1; // since last update
optional uint64 BytesWritten = 2; // since last update
}
+
+message TEvPrepareWriteS3 {
+ message TExtraBlockCheck {
+ optional uint64 TabletId = 1;
+ optional uint32 Generation = 2;
+ }
+
+ message TItem {
+ optional bytes Key = 1; // the key user is going to write
+ repeated TExtraBlockCheck ExtraBlockChecks = 2;
+ optional uint32 Len = 3; // length of the value
+ }
+
+ repeated TItem Items = 1;
+}
+
+message TEvPrepareWriteS3Result {
+ message TItem {
+ optional NKikimrProto.EReplyStatus Status = 1;
+ optional string ErrorReason = 2;
+ optional TS3Locator S3Locator = 3;
+ }
+
+ repeated TItem Items = 1;
+}
diff --git a/ydb/core/protos/blob_depot_config.proto b/ydb/core/protos/blob_depot_config.proto
index 0d63aec67a6..ab0ae975448 100644
--- a/ydb/core/protos/blob_depot_config.proto
+++ b/ydb/core/protos/blob_depot_config.proto
@@ -21,23 +21,34 @@ message TChannelProfile {
message TS3BackendSettings {
optional NKikimrSchemeOp.TS3Settings Settings = 1; // how to connect to S3 (mandatory field)
- // data (when written) is first stored in ordinary data channel and then asynchronously transferred to S3 storage by
- // BlobDepot tablet; amount of stored data can be controlled by following settings
- //
- // future operation may include modes when data is uploaded by agents directly to the storage
+ message TAsyncMode {
+ // data (when written) is first stored in ordinary data channel and then asynchronously transferred to S3 storage by
+ // BlobDepot tablet; amount of stored data can be controlled by following settings
+ //
+ // future operation may include modes when data is uploaded by agents directly to the storage
- // when do we fully stop processing new queries until some data is uploaded
- optional uint64 MaxPendingBytes = 2;
+ // when do we fully stop processing new queries until some data is uploaded
+ optional uint64 MaxPendingBytes = 2;
- // when do we start throttling (if set, this must be strictly less than MaxPendingBytes, which also must be set)
- optional uint64 ThrottleStartBytes = 3;
+ // when do we start throttling (if set, this must be strictly less than MaxPendingBytes, which also must be set)
+ optional uint64 ThrottleStartBytes = 3;
- // upload rate (bytes per second) at ThrottleStartBytes amount of data (which linearly drops to 0 as amount increases
- // up to MaxPendingBytes)
- optional uint64 ThrottleMaxBytesPerSecond = 4;
+ // upload rate (bytes per second) at ThrottleStartBytes amount of data (which linearly drops to 0 as amount increases
+ // up to MaxPendingBytes)
+ optional uint64 ThrottleMaxBytesPerSecond = 4;
- // number of concurrent puts
- optional uint32 AsyncUploadPutsInFlight = 5 [default = 1];
+ // number of concurrent puts
+ optional uint32 UploadPutsInFlight = 5 [default = 1];
+ }
+
+ message TSyncMode {
+ // data is written synchronously by agent
+ }
+
+ oneof Mode {
+ TAsyncMode AsyncMode = 2;
+ TSyncMode SyncMode = 3;
+ }
}
message TBlobDepotConfig {
@@ -49,4 +60,5 @@ message TBlobDepotConfig {
optional NKikimrHive.TEvCreateTablet HiveParams = 6; // extra hive parameters
optional uint64 TenantHiveId = 7;
optional TS3BackendSettings S3BackendSettings = 8; // if set, then this tablet stores all its data in S3 (except for some cache/inflight)
+ optional string Name = 9;
}
diff --git a/ydb/core/protos/blobstorage_config.proto b/ydb/core/protos/blobstorage_config.proto
index f1f3e71fab1..af6a8fa96d7 100644
--- a/ydb/core/protos/blobstorage_config.proto
+++ b/ydb/core/protos/blobstorage_config.proto
@@ -495,6 +495,7 @@ message TAllocateVirtualGroup {
}
repeated NKikimrBlobDepot.TChannelProfile ChannelProfiles = 5;
uint64 BlobDepotId = 6; // when the tablet is already created; for testing purposes only
+ optional NKikimrBlobDepot.TS3BackendSettings S3BackendSettings = 8; // if S3 is used as a backend for this instance
}
message TDecommitGroups {
diff --git a/ydb/core/protos/counters_blob_depot.proto b/ydb/core/protos/counters_blob_depot.proto
index 9b32efdbf94..81d0c6cc4e4 100644
--- a/ydb/core/protos/counters_blob_depot.proto
+++ b/ydb/core/protos/counters_blob_depot.proto
@@ -9,6 +9,10 @@ enum ESimpleCounters {
COUNTER_TOTAL_STORED_TRASH_SIZE = 1 [(NKikimr.CounterOpts) = {Name: "TotalStoredTrashSize"}];
COUNTER_IN_FLIGHT_TRASH_SIZE = 2 [(NKikimr.CounterOpts) = {Name: "InFlightTrashSize"}];
COUNTER_BYTES_TO_DECOMMIT = 3 [(NKikimr.CounterOpts) = {Name: "BytesToDecommit"}];
+ COUNTER_TOTAL_S3_DATA_OBJECTS = 4 [(NKikimr.CounterOpts) = {Name: "TotalS3DataObjects"}];
+ COUNTER_TOTAL_S3_DATA_SIZE = 5 [(NKikimr.CounterOpts) = {Name: "TotalS3DataSize"}];
+ COUNTER_TOTAL_S3_TRASH_OBJECTS = 6 [(NKikimr.CounterOpts) = {Name: "TotalS3TrashObjects"}];
+ COUNTER_TOTAL_S3_TRASH_SIZE = 7 [(NKikimr.CounterOpts) = {Name: "TotalS3TrashSize"}];
}
enum ECumulativeCounters {
@@ -17,6 +21,12 @@ enum ECumulativeCounters {
COUNTER_PUTS_ERROR = 2 [(NKikimr.CounterOpts) = {Name: "Puts/Error"}];
COUNTER_DECOMMIT_GET_BYTES = 3 [(NKikimr.CounterOpts) = {Name: "Decommit/GetBytes"}];
COUNTER_DECOMMIT_PUT_OK_BYTES = 4 [(NKikimr.CounterOpts) = {Name: "Decommit/PutOkBytes"}];
+ COUNTER_S3_PUTS_OK = 5 [(NKikimr.CounterOpts) = {Name: "S3/Puts/Ok"}];
+ COUNTER_S3_PUTS_ERROR = 6 [(NKikimr.CounterOpts) = {Name: "S3/Puts/Error"}];
+ COUNTER_S3_PUTS_BYTES = 7 [(NKikimr.CounterOpts) = {Name: "S3/Puts/OkBytes"}];
+ COUNTER_S3_DELETES_OK = 8 [(NKikimr.CounterOpts) = {Name: "S3/Deletes/Ok"}];
+ COUNTER_S3_DELETES_ERROR = 9 [(NKikimr.CounterOpts) = {Name: "S3/Deletes/Error"}];
+ COUNTER_S3_DELETES_BYTES = 10 [(NKikimr.CounterOpts) = {Name: "S3/Deletes/OkBytes"}];
}
enum EPercentileCounters {
@@ -61,4 +71,7 @@ enum ETxTypes {
TXTYPE_COMMIT_BLOB_SEQ = 14 [(NKikimr.TxTypeOpts) = {Name: "TTxCommitBlobSeq"}];
TXTYPE_UPDATE_BLOCK = 15 [(NKikimr.TxTypeOpts) = {Name: "TTxUpdateBlock"}];
TXTYPE_HARD_GC = 16 [(NKikimr.TxTypeOpts) = {Name: "TTxHardGC"}];
+ TXTYPE_PREPARE_WRITE_S3 = 17 [(NKikimr.TxTypeOpts) = {Name: "TTxPrepareWriteS3"}];
+ TXTYPE_DELETE_TRASH_S3 = 18 [(NKikimr.TxTypeOpts) = {Name: "TTxDeleteTrashS3"}];
+ TXTYPE_PROCESS_SCANNED_KEYS = 19 [(NKikimr.TxTypeOpts) = {Name: "TTxProcessScannedKeys"}];
}
diff --git a/ydb/core/util/stlog.h b/ydb/core/util/stlog.h
index d35fefc4e9c..5cce2fd8420 100644
--- a/ydb/core/util/stlog.h
+++ b/ydb/core/util/stlog.h
@@ -104,6 +104,12 @@ namespace NKikimr::NStLog {
template<typename T> struct TIsIterable<NProtoBuf::RepeatedField<T>> { static constexpr bool value = true; };
template<typename T> struct TIsIterable<NProtoBuf::RepeatedPtrField<T>> { static constexpr bool value = true; };
+ template<typename T> struct TIsIterableKV { static constexpr bool value = false; };
+ template<typename... Ts> struct TIsIterableKV<THashMap<Ts...>> { static constexpr bool value = true; };
+ template<typename... Ts> struct TIsIterableKV<TMap<Ts...>> { static constexpr bool value = true; };
+ template<typename... Ts> struct TIsIterableKV<std::map<Ts...>> { static constexpr bool value = true; };
+ template<typename... Ts> struct TIsIterableKV<std::unordered_map<Ts...>> { static constexpr bool value = true; };
+
template<typename T> struct TIsIdWrapper { static constexpr bool value = false; };
template<typename TType, typename TTag> struct TIsIdWrapper<TIdWrapper<TType, TTag>> { static constexpr bool value = true; };
@@ -180,6 +186,19 @@ namespace NKikimr::NStLog {
OutputParam(s, *begin);
}
s << "]";
+ } else if constexpr (TIsIterableKV<Tx>::value) {
+ s << '{';
+ for (bool first = true; const auto& [k, v] : value) {
+ if (first) {
+ first = false;
+ } else {
+ s << ' ';
+ }
+ OutputParam(s, k);
+ s << ':';
+ OutputParam(s, v);
+ }
+ s << '}';
} else {
s << value;
}
@@ -212,6 +231,17 @@ namespace NKikimr::NStLog {
OutputParam(json, *begin);
}
json.CloseArray();
+ } else if constexpr (TIsIterableKV<Tx>::value) {
+ json.OpenArray();
+ for (const auto& [k, v] : value) {
+ json.OpenMap();
+ json.WriteKey("key");
+ OutputParam(json, k);
+ json.WriteKey("value");
+ OutputParam(json, v);
+ json.CloseMap();
+ }
+ json.CloseArray();
} else if constexpr (TIsIdWrapper<Tx>::value){
json.Write(value.GetRawId());
} else if constexpr (std::is_constructible_v<NJson::TJsonValue, Tx>) {