aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@ydb.tech>2025-03-12 15:58:05 +0300
committerGitHub <noreply@github.com>2025-03-12 12:58:05 +0000
commit544240601801ea481394f0c44fe0da68a1aa631e (patch)
treec18c3615e33072564474a672ab6a57023133be6b
parent70284e90bceb26efe3a11d069f9d7abd27e63106 (diff)
downloadydb-544240601801ea481394f0c44fe0da68a1aa631e.tar.gz
Fix Windows build (#15455)
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h14
-rw-r--r--ydb/core/blob_depot/agent/comm.cpp13
-rw-r--r--ydb/core/blob_depot/agent/read.cpp107
-rw-r--r--ydb/core/blob_depot/agent/s3.cpp149
-rw-r--r--ydb/core/blob_depot/agent/storage_put.cpp60
-rw-r--r--ydb/core/blob_depot/agent/ya.make10
-rw-r--r--ydb/core/blob_depot/s3.cpp5
-rw-r--r--ydb/core/blob_depot/s3.h5
-rw-r--r--ydb/core/blob_depot/s3_delete.cpp3
-rw-r--r--ydb/core/blob_depot/s3_scan.cpp3
-rw-r--r--ydb/core/blob_depot/s3_upload.cpp3
-rw-r--r--ydb/core/blob_depot/s3_windows_stub.cpp50
-rw-r--r--ydb/core/blob_depot/ya.make22
-rw-r--r--ydb/core/util/aws.cpp13
-rw-r--r--ydb/core/util/aws_windows_stub.cpp8
-rw-r--r--ydb/core/util/ya.make7
16 files changed, 291 insertions, 181 deletions
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index 4cb90dac9b..8f0f08ff63 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -5,8 +5,6 @@
#include <ydb/core/protos/blob_depot_config.pb.h>
-#include <ydb/core/wrappers/abstract.h>
-
namespace NKikimr::NBlobDepot {
#define ENUMERATE_INCOMING_EVENTS(XX) \
@@ -360,11 +358,12 @@ namespace NKikimr::NBlobDepot {
NKikimrBlobStorage::TPDiskSpaceColor::E SpaceColor = {};
float ApproximateFreeSpaceShare = 0.0f;
- NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig;
std::optional<NKikimrBlobDepot::TS3BackendSettings> S3BackendSettings;
TActorId S3WrapperId;
TString S3BasePath;
+ void InitS3(const TString& name);
+
void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev);
void SetupCounters();
@@ -407,6 +406,9 @@ namespace NKikimr::NBlobDepot {
std::shared_ptr<TEvBlobStorage::TExecutionRelay> ExecutionRelay;
ui32 BlockChecksRemain = 3;
+ struct TLifetimeToken {};
+ std::shared_ptr<TLifetimeToken> LifetimeToken;
+
static constexpr TDuration WatchdogDuration = TDuration::Seconds(10);
public:
@@ -426,10 +428,16 @@ namespace NKikimr::NBlobDepot {
virtual void OnRead(ui64 /*tag*/, TReadOutcome&& /*outcome*/) {}
virtual void OnIdAllocated(bool /*success*/) {}
virtual void OnDestroy(bool /*success*/) {}
+ virtual void OnPutS3ObjectResponse(std::optional<TString>&& /*error*/) { Y_ABORT(); }
NKikimrProto::EReplyStatus CheckBlockForTablet(ui64 tabletId, std::optional<ui32> generation,
ui32 *blockedGeneration = nullptr);
+ using TFinishCallback = std::function<void(std::optional<TString>, const char*)>;
+ void IssueReadS3(const TString& key, ui32 offset, ui32 len, TFinishCallback finish, ui64 readId);
+
+ TActorId IssueWriteS3(TString&& key, TRope&& buffer, TLogoBlobID id);
+
protected: // reading logic
struct TReadContext;
struct TReadArg {
diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp
index 40c03faf3d..f6fbae259a 100644
--- a/ydb/core/blob_depot/agent/comm.cpp
+++ b/ydb/core/blob_depot/agent/comm.cpp
@@ -1,8 +1,6 @@
#include "agent_impl.h"
#include "blocks.h"
-#include <ydb/core/wrappers/s3_wrapper.h>
-
namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) {
@@ -99,14 +97,9 @@ namespace NKikimr::NBlobDepot {
S3WrapperId = {};
}
- if (S3BackendSettings) {
- auto& settings = S3BackendSettings->GetSettings();
- ExternalStorageConfig = NWrappers::IExternalStorageConfig::Construct(settings);
- S3WrapperId = Register(NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
- S3BasePath = TStringBuilder() << settings.GetObjectKeyPattern() << '/' << msg.GetName();
- } else {
- ExternalStorageConfig = {};
- }
+#ifndef KIKIMR_DISABLE_S3_OPS
+ InitS3(msg.GetName());
+#endif
OnConnect();
}
diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp
index 085c87cd10..935a8bf0b4 100644
--- a/ydb/core/blob_depot/agent/read.cpp
+++ b/ydb/core/blob_depot/agent/read.cpp
@@ -173,100 +173,29 @@ namespace NKikimr::NBlobDepot {
}
for (TS3ReadItem& item : s3items) {
- class TGetActor : public TActor<TGetActor> {
- size_t OutputOffset;
- std::shared_ptr<TReadContext> ReadContext;
- TQuery* const Query;
-
- TString AgentLogId;
- TString QueryId;
- ui64 ReadId;
-
- NMonitoring::TDynamicCounters::TCounterPtr GetBytesOk;
- NMonitoring::TDynamicCounters::TCounterPtr GetsOk;
- NMonitoring::TDynamicCounters::TCounterPtr GetsError;
-
- public:
- TGetActor(size_t outputOffset, std::shared_ptr<TReadContext> readContext, TQuery *query,
- NMonitoring::TDynamicCounters::TCounterPtr getBytesOk,
- NMonitoring::TDynamicCounters::TCounterPtr getsOk,
- NMonitoring::TDynamicCounters::TCounterPtr getsError)
- : TActor(&TThis::StateFunc)
- , OutputOffset(outputOffset)
- , ReadContext(std::move(readContext))
- , Query(query)
- , AgentLogId(query->Agent.LogId)
- , QueryId(query->GetQueryId())
- , ReadId(ReadContext->GetTag())
- , GetBytesOk(std::move(getBytesOk))
- , GetsOk(std::move(getsOk))
- , GetsError(std::move(getsError))
- {}
-
- void Handle(NWrappers::TEvExternalStorage::TEvGetObjectResponse::TPtr ev) {
- auto& msg = *ev->Get();
-
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA55, "received TEvGetObjectResponse",
- (AgentId, AgentLogId), (QueryId, QueryId), (ReadId, ReadId),
- (Response, msg.Result), (BodyLen, std::size(msg.Body)));
-
- if (msg.IsSuccess()) {
- ++*GetsOk;
- *GetBytesOk += msg.Body.size();
- } else {
- ++*GetsError;
- }
-
- if (msg.IsSuccess()) {
- Finish(std::move(msg.Body), "");
- } else if (const auto& error = msg.GetError(); error.GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) {
- Finish(std::nullopt, "data has disappeared from S3");
- } else {
- Finish(std::nullopt, msg.GetError().GetMessage().c_str());
- }
- }
-
- void HandleUndelivered() {
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA56, "received TEvUndelivered",
- (AgentId, AgentLogId), (QueryId, QueryId), (ReadId, ReadId));
- Finish(std::nullopt, "wrapper actor terminated");
- }
-
- void Finish(std::optional<TString> data, const char *error) {
- auto& context = *ReadContext;
- if (!context.Terminated && !context.StopProcessingParts) {
- if (data) {
- context.Buffer.Write(OutputOffset, TRope(std::move(*data)));
- if (!--context.NumPartsPending) {
- context.EndWithSuccess(Query);
- }
- } else {
- context.EndWithError(Query, NKikimrProto::ERROR, TStringBuilder()
- << "failed to fetch data from S3: " << error);
+#ifndef KIKIMR_DISABLE_S3_OPS
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA57, "starting S3 read", (AgentId, Agent.LogId), (QueryId, GetQueryId()),
+ (ReadId, context->GetTag()), (Key, item.Key), (Offset, item.Offset), (Size, item.Size),
+ (OutputOffset, item.OutputOffset));
+ auto finish = [contextPtr = context, outputOffset = item.OutputOffset, this](std::optional<TString> data, const char *error) {
+ auto& context = *contextPtr;
+ if (!context.Terminated && !context.StopProcessingParts) {
+ if (data) {
+ context.Buffer.Write(outputOffset, TRope(std::move(*data)));
+ if (!--context.NumPartsPending) {
+ context.EndWithSuccess(this);
}
+ } else {
+ context.EndWithError(this, NKikimrProto::ERROR, TStringBuilder()
+ << "failed to fetch data from S3: " << error);
}
- PassAway();
}
-
- STRICT_STFUNC(StateFunc,
- hFunc(NWrappers::TEvExternalStorage::TEvGetObjectResponse, Handle)
- cFunc(TEvents::TSystem::Undelivered, HandleUndelivered)
- cFunc(TEvents::TSystem::Poison, PassAway)
- )
};
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA57, "starting S3 read", (AgentId, Agent.LogId), (QueryId, GetQueryId()),
- (ReadId, context->GetTag()), (Key, item.Key), (Offset, item.Offset), (Size, item.Size),
- (OutputOffset, item.OutputOffset));
- const TActorId actorId = Agent.RegisterWithSameMailbox(new TGetActor(item.OutputOffset, context, this,
- Agent.S3GetBytesOk, Agent.S3GetsOk, Agent.S3GetsError));
- auto request = std::make_unique<NWrappers::TEvExternalStorage::TEvGetObjectRequest>(
- Aws::S3::Model::GetObjectRequest()
- .WithBucket(Agent.S3BackendSettings->GetSettings().GetBucket())
- .WithKey(std::move(item.Key))
- .WithRange(TStringBuilder() << "bytes=" << item.Offset << '-' << item.Offset + item.Size - 1)
- );
- TActivationContext::Send(new IEventHandle(Agent.S3WrapperId, actorId, request.release(), IEventHandle::FlagTrackDelivery));
+ IssueReadS3(item.Key, item.Offset, item.Size, finish, context->GetTag());
++context->NumPartsPending;
+#else
+ Y_ABORT("S3 is not supported");
+#endif
}
Y_ABORT_UNLESS(context->NumPartsPending);
diff --git a/ydb/core/blob_depot/agent/s3.cpp b/ydb/core/blob_depot/agent/s3.cpp
new file mode 100644
index 0000000000..8979ca22ec
--- /dev/null
+++ b/ydb/core/blob_depot/agent/s3.cpp
@@ -0,0 +1,149 @@
+#include "agent_impl.h"
+
+#include <ydb/core/wrappers/abstract.h>
+#include <ydb/core/wrappers/s3_wrapper.h>
+
+namespace NKikimr::NBlobDepot {
+
+ void TBlobDepotAgent::InitS3(const TString& name) {
+ if (S3BackendSettings) {
+ auto& settings = S3BackendSettings->GetSettings();
+ auto externalStorageConfig = NWrappers::IExternalStorageConfig::Construct(settings);
+ S3WrapperId = Register(NWrappers::CreateS3Wrapper(externalStorageConfig->ConstructStorageOperator()));
+ S3BasePath = TStringBuilder() << settings.GetObjectKeyPattern() << '/' << name;
+ }
+ }
+
+ void TBlobDepotAgent::TQuery::IssueReadS3(const TString& key, ui32 offset, ui32 len, TFinishCallback finish, ui64 readId) {
+ class TGetActor : public TActor<TGetActor> {
+ TFinishCallback Finish;
+
+ TString AgentLogId;
+ TString QueryId;
+ ui64 ReadId;
+
+ NMonitoring::TDynamicCounters::TCounterPtr GetBytesOk;
+ NMonitoring::TDynamicCounters::TCounterPtr GetsOk;
+ NMonitoring::TDynamicCounters::TCounterPtr GetsError;
+
+ public:
+ TGetActor(TQuery *query, TFinishCallback finish, ui64 readId,
+ NMonitoring::TDynamicCounters::TCounterPtr getBytesOk,
+ NMonitoring::TDynamicCounters::TCounterPtr getsOk,
+ NMonitoring::TDynamicCounters::TCounterPtr getsError)
+ : TActor(&TThis::StateFunc)
+ , Finish(std::move(finish))
+ , AgentLogId(query->Agent.LogId)
+ , QueryId(query->GetQueryId())
+ , ReadId(readId)
+ , GetBytesOk(std::move(getBytesOk))
+ , GetsOk(std::move(getsOk))
+ , GetsError(std::move(getsError))
+ {}
+
+ void Handle(NWrappers::TEvExternalStorage::TEvGetObjectResponse::TPtr ev) {
+ auto& msg = *ev->Get();
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA55, "received TEvGetObjectResponse",
+ (AgentId, AgentLogId), (QueryId, QueryId), (ReadId, ReadId),
+ (Response, msg.Result), (BodyLen, std::size(msg.Body)));
+
+ if (msg.IsSuccess()) {
+ ++*GetsOk;
+ *GetBytesOk += msg.Body.size();
+ } else {
+ ++*GetsError;
+ }
+
+ if (msg.IsSuccess()) {
+ Finish(std::move(msg.Body), "");
+ } else if (const auto& error = msg.GetError(); error.GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) {
+ Finish(std::nullopt, "data has disappeared from S3");
+ } else {
+ Finish(std::nullopt, msg.GetError().GetMessage().c_str());
+ }
+ PassAway();
+ }
+
+ void HandleUndelivered() {
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA56, "received TEvUndelivered",
+ (AgentId, AgentLogId), (QueryId, QueryId), (ReadId, ReadId));
+ Finish(std::nullopt, "wrapper actor terminated");
+ PassAway();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(NWrappers::TEvExternalStorage::TEvGetObjectResponse, Handle)
+ cFunc(TEvents::TSystem::Undelivered, HandleUndelivered)
+ cFunc(TEvents::TSystem::Poison, PassAway)
+ )
+ };
+ const TActorId actorId = Agent.RegisterWithSameMailbox(new TGetActor(this, std::move(finish), readId,
+ Agent.S3GetBytesOk, Agent.S3GetsOk, Agent.S3GetsError));
+ auto request = std::make_unique<NWrappers::TEvExternalStorage::TEvGetObjectRequest>(
+ Aws::S3::Model::GetObjectRequest()
+ .WithBucket(Agent.S3BackendSettings->GetSettings().GetBucket())
+ .WithKey(std::move(key))
+ .WithRange(TStringBuilder() << "bytes=" << offset << '-' << offset + len - 1)
+ );
+ TActivationContext::Send(new IEventHandle(Agent.S3WrapperId, actorId, request.release(), IEventHandle::FlagTrackDelivery));
+ }
+
+ TActorId TBlobDepotAgent::TQuery::IssueWriteS3(TString&& key, TRope&& buffer, TLogoBlobID id) {
+ class TWriteActor : public TActor<TWriteActor> {
+ std::weak_ptr<TLifetimeToken> LifetimeToken;
+ TQuery* const Query;
+
+ public:
+ TWriteActor(std::weak_ptr<TLifetimeToken> lifetimeToken, TQuery *query)
+ : TActor(&TThis::StateFunc)
+ , LifetimeToken(std::move(lifetimeToken))
+ , Query(query)
+ {}
+
+ void Handle(NWrappers::TEvExternalStorage::TEvPutObjectResponse::TPtr ev) {
+ auto& msg = *ev->Get();
+ Finish(msg.IsSuccess()
+ ? std::nullopt
+ : std::make_optional<TString>(msg.GetError().GetMessage()));
+ }
+
+ void HandleUndelivered() {
+ Finish("event undelivered");
+ }
+
+ void Finish(std::optional<TString>&& error) {
+ if (!LifetimeToken.expired()) {
+ InvokeOtherActor(Query->Agent, &TBlobDepotAgent::Invoke, [&] {
+ Query->OnPutS3ObjectResponse(std::move(error));
+ });
+ }
+ PassAway();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(NWrappers::TEvExternalStorage::TEvPutObjectResponse, Handle)
+ cFunc(TEvents::TSystem::Undelivered, HandleUndelivered)
+ cFunc(TEvents::TSystem::Poison, PassAway)
+ )
+ };
+
+ if (!LifetimeToken) {
+ LifetimeToken = std::make_shared<TLifetimeToken>();
+ }
+
+ const TActorId writerActorId = Agent.RegisterWithSameMailbox(new TWriteActor(LifetimeToken, this));
+
+ TActivationContext::Send(new IEventHandle(Agent.S3WrapperId, writerActorId,
+ new NWrappers::TEvExternalStorage::TEvPutObjectRequest(
+ Aws::S3::Model::PutObjectRequest()
+ .WithBucket(std::move(Agent.S3BackendSettings->GetSettings().GetBucket()))
+ .WithKey(std::move(key))
+ .AddMetadata("key", id.ToString()),
+ buffer.ExtractUnderlyingContainerOrCopy<TString>()),
+ IEventHandle::FlagTrackDelivery));
+
+ return writerActorId;
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp
index 17ece91bf3..54f02cacf4 100644
--- a/ydb/core/blob_depot/agent/storage_put.cpp
+++ b/ydb/core/blob_depot/agent/storage_put.cpp
@@ -20,9 +20,6 @@ namespace NKikimr::NBlobDepot {
TActorId WriterActorId;
ui64 ConnectionInstanceOnStart;
- struct TLifetimeToken {};
- std::shared_ptr<TLifetimeToken> LifetimeToken;
-
public:
using TBlobStorageQuery::TBlobStorageQuery;
@@ -345,6 +342,7 @@ namespace NKikimr::NBlobDepot {
}
void HandlePrepareWriteS3Result(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvPrepareWriteS3Result& msg) {
+#ifndef KIKIMR_DISABLE_S3_OPS
Y_ABORT_UNLESS(msg.ItemsSize() == 1);
const auto& item = msg.GetItems(0);
if (item.GetStatus() != NKikimrProto::OK) {
@@ -357,65 +355,21 @@ namespace NKikimr::NBlobDepot {
LocatorInFlight.emplace(TS3Locator::FromProto(*locator));
- class TWriteActor : public TActor<TWriteActor> {
- std::weak_ptr<TLifetimeToken> LifetimeToken;
- TPutQuery* const PutQuery;
-
- public:
- TWriteActor(std::weak_ptr<TLifetimeToken> lifetimeToken, TPutQuery *putQuery)
- : TActor(&TThis::StateFunc)
- , LifetimeToken(std::move(lifetimeToken))
- , PutQuery(putQuery)
- {}
-
- void Handle(NWrappers::TEvExternalStorage::TEvPutObjectResponse::TPtr ev) {
- auto& msg = *ev->Get();
- Finish(msg.IsSuccess()
- ? std::nullopt
- : std::make_optional<TString>(msg.GetError().GetMessage()));
- }
-
- void HandleUndelivered() {
- Finish("event undelivered");
- }
-
- void Finish(std::optional<TString>&& error) {
- if (!LifetimeToken.expired()) {
- InvokeOtherActor(PutQuery->Agent, &TBlobDepotAgent::Invoke, [&] {
- PutQuery->OnPutS3ObjectResponse(std::move(error));
- });
- }
- PassAway();
- }
-
- STRICT_STFUNC(StateFunc,
- hFunc(NWrappers::TEvExternalStorage::TEvPutObjectResponse, Handle)
- cFunc(TEvents::TSystem::Undelivered, HandleUndelivered)
- cFunc(TEvents::TSystem::Poison, PassAway)
- )
- };
-
TString key = TS3Locator::FromProto(*locator).MakeObjectName(Agent.S3BasePath);
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA54, "starting WriteActor", (AgentId, Agent.LogId),
(QueryId, GetQueryId()), (Key, key));
- LifetimeToken = std::make_shared<TLifetimeToken>();
- WriterActorId = Agent.RegisterWithSameMailbox(new TWriteActor(LifetimeToken, this));
-
- TActivationContext::Send(new IEventHandle(Agent.S3WrapperId, WriterActorId,
- new NWrappers::TEvExternalStorage::TEvPutObjectRequest(
- Aws::S3::Model::PutObjectRequest()
- .WithBucket(std::move(Agent.S3BackendSettings->GetSettings().GetBucket()))
- .WithKey(std::move(key))
- .AddMetadata("key", Request.Id.ToString()),
- Request.Buffer.ExtractUnderlyingContainerOrCopy<TString>()),
- IEventHandle::FlagTrackDelivery));
+ WriterActorId = IssueWriteS3(std::move(key), std::move(Request.Buffer), Request.Id);
ConnectionInstanceOnStart = Agent.ConnectionInstance;
+#else
+ Y_UNUSED(msg);
+ Y_ABORT("S3 is not supported");
+#endif
}
- void OnPutS3ObjectResponse(std::optional<TString>&& error) {
+ void OnPutS3ObjectResponse(std::optional<TString>&& error) override {
STLOG(error ? PRI_WARN : PRI_DEBUG, BLOB_DEPOT_AGENT, BDA53, "OnPutS3ObjectResponse",
(AgentId, Agent.LogId), (QueryId, GetQueryId()), (Error, error));
diff --git a/ydb/core/blob_depot/agent/ya.make b/ydb/core/blob_depot/agent/ya.make
index 515da83e26..ae0a6b0915 100644
--- a/ydb/core/blob_depot/agent/ya.make
+++ b/ydb/core/blob_depot/agent/ya.make
@@ -1,5 +1,15 @@
LIBRARY()
+ IF (OS_WINDOWS)
+ CFLAGS(
+ -DKIKIMR_DISABLE_S3_OPS
+ )
+ ELSE()
+ SRCS(
+ s3.cpp
+ )
+ ENDIF()
+
SRCS(
agent.cpp
agent.h
diff --git a/ydb/core/blob_depot/s3.cpp b/ydb/core/blob_depot/s3.cpp
index 72ea67b62a..95f940b415 100644
--- a/ydb/core/blob_depot/s3.cpp
+++ b/ydb/core/blob_depot/s3.cpp
@@ -1,4 +1,5 @@
#include "s3.h"
+
#include <ydb/core/wrappers/s3_wrapper.h>
namespace NKikimr::NBlobDepot {
@@ -14,8 +15,8 @@ namespace NKikimr::NBlobDepot {
void TS3Manager::Init(const NKikimrBlobDepot::TS3BackendSettings *settings) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDTS05, "Init", (Settings, settings));
if (settings) {
- ExternalStorageConfig = NWrappers::IExternalStorageConfig::Construct(settings->GetSettings());
- WrapperId = Self->Register(NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
+ auto externalStorageConfig = NWrappers::IExternalStorageConfig::Construct(settings->GetSettings());
+ WrapperId = Self->Register(NWrappers::CreateS3Wrapper(externalStorageConfig->ConstructStorageOperator()));
BasePath = TStringBuilder() << settings->GetSettings().GetObjectKeyPattern() << '/' << Self->Config.GetName();
Bucket = settings->GetSettings().GetBucket();
SyncMode = settings->HasSyncMode();
diff --git a/ydb/core/blob_depot/s3.h b/ydb/core/blob_depot/s3.h
index 9a18fb732d..8dcad947ec 100644
--- a/ydb/core/blob_depot/s3.h
+++ b/ydb/core/blob_depot/s3.h
@@ -4,15 +4,10 @@
#include "blob_depot_tablet.h"
#include "data.h"
-#include <ydb/core/wrappers/abstract.h>
-
namespace NKikimr::NBlobDepot {
class TBlobDepot::TS3Manager {
- using TEvExternalStorage = NWrappers::TEvExternalStorage;
-
TBlobDepot* const Self;
- NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig;
TActorId WrapperId;
TActorId UploaderId;
TString BasePath;
diff --git a/ydb/core/blob_depot/s3_delete.cpp b/ydb/core/blob_depot/s3_delete.cpp
index 5825d20c71..913c598729 100644
--- a/ydb/core/blob_depot/s3_delete.cpp
+++ b/ydb/core/blob_depot/s3_delete.cpp
@@ -1,8 +1,11 @@
#include "s3.h"
+#include <ydb/core/wrappers/abstract.h>
+
namespace NKikimr::NBlobDepot {
using TS3Manager = TBlobDepot::TS3Manager;
+ using TEvExternalStorage = NWrappers::TEvExternalStorage;
struct TS3Manager::TEvDeleteResult : TEventLocal<TEvDeleteResult, TEvPrivate::EvDeleteResult> {
std::vector<TS3Locator> LocatorsOk;
diff --git a/ydb/core/blob_depot/s3_scan.cpp b/ydb/core/blob_depot/s3_scan.cpp
index f4dd43f190..53bda9eba6 100644
--- a/ydb/core/blob_depot/s3_scan.cpp
+++ b/ydb/core/blob_depot/s3_scan.cpp
@@ -1,8 +1,11 @@
#include "s3.h"
+#include <ydb/core/wrappers/abstract.h>
+
namespace NKikimr::NBlobDepot {
using TS3Manager = TBlobDepot::TS3Manager;
+ using TEvExternalStorage = NWrappers::TEvExternalStorage;
struct TS3Manager::TEvScanFound : TEventLocal<TEvScanFound, TEvPrivate::EvScanFound> {
std::vector<std::tuple<TString, ui64>> KeysWithoutPrefix;
diff --git a/ydb/core/blob_depot/s3_upload.cpp b/ydb/core/blob_depot/s3_upload.cpp
index 9877744cf6..22d193b0e2 100644
--- a/ydb/core/blob_depot/s3_upload.cpp
+++ b/ydb/core/blob_depot/s3_upload.cpp
@@ -1,8 +1,11 @@
#include "s3.h"
+#include <ydb/core/wrappers/abstract.h>
+
namespace NKikimr::NBlobDepot {
using TS3Manager = TBlobDepot::TS3Manager;
+ using TEvExternalStorage = NWrappers::TEvExternalStorage;
struct TS3Manager::TEvUploadResult : TEventLocal<TEvUploadResult, TEvPrivate::EvUploadResult> {
};
diff --git a/ydb/core/blob_depot/s3_windows_stub.cpp b/ydb/core/blob_depot/s3_windows_stub.cpp
new file mode 100644
index 0000000000..26d11f74ed
--- /dev/null
+++ b/ydb/core/blob_depot/s3_windows_stub.cpp
@@ -0,0 +1,50 @@
+#include "s3.h"
+
+namespace NKikimr::NBlobDepot {
+
+ using TS3Manager = TBlobDepot::TS3Manager;
+
+ TS3Manager::TS3Manager(TBlobDepot *self)
+ : Self(self)
+ {}
+
+ TS3Manager::~TS3Manager() = default;
+
+ void TS3Manager::Init(const NKikimrBlobDepot::TS3BackendSettings *settings) {
+ if (settings) {
+ Y_ABORT("S3 is not supported on Windows");
+ } else {
+ SyncMode = false;
+ AsyncMode = false;
+ }
+ }
+
+ void TS3Manager::TerminateAllActors() {}
+
+ void TS3Manager::Handle(TAutoPtr<IEventHandle> /*ev*/) {
+ Y_ABORT("S3 is not supported on Windows");
+ }
+
+ void TS3Manager::AddTrashToCollect(TS3Locator /*locator*/) {
+ Y_ABORT("S3 is not supported on Windows");
+ }
+
+ void TS3Manager::HandleDeleter(TAutoPtr<IEventHandle> /*ev*/) {
+ Y_ABORT("S3 is not supported on Windows");
+ }
+
+ void TS3Manager::HandleScanner(TAutoPtr<IEventHandle> /*ev*/) {
+ Y_ABORT("S3 is not supported on Windows");
+ }
+
+ void TS3Manager::OnKeyWritten(const TData::TKey& /*key*/, const TValueChain& /*valueChain*/) {}
+
+ void TBlobDepot::Handle(TEvBlobDepot::TEvPrepareWriteS3::TPtr /*ev*/) {
+ Y_ABORT("S3 is not supported on Windows");
+ }
+
+ void TBlobDepot::InitS3Manager() {
+ S3Manager->Init(Config.HasS3BackendSettings() ? &Config.GetS3BackendSettings() : nullptr);
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/ya.make b/ydb/core/blob_depot/ya.make
index 42b8adf4d6..1ec0805adf 100644
--- a/ydb/core/blob_depot/ya.make
+++ b/ydb/core/blob_depot/ya.make
@@ -1,5 +1,22 @@
LIBRARY()
+ IF (OS_WINDOWS)
+ CFLAGS(
+ -DKIKIMR_DISABLE_S3_OPS
+ )
+ SRCS(
+ s3_windows_stub.cpp
+ )
+ ELSE()
+ SRCS(
+ s3.cpp
+ s3_delete.cpp
+ s3_scan.cpp
+ s3_upload.cpp
+ s3_write.cpp
+ )
+ ENDIF()
+
SRCS(
blob_depot.cpp
blob_depot.h
@@ -33,11 +50,6 @@ LIBRARY()
group_metrics_exchange.cpp
mon_main.cpp
mon_main.h
- s3.cpp
- s3_delete.cpp
- s3_scan.cpp
- s3_upload.cpp
- s3_write.cpp
s3.h
space_monitor.cpp
space_monitor.h
diff --git a/ydb/core/util/aws.cpp b/ydb/core/util/aws.cpp
index 9346a824cd..b26d100d7a 100644
--- a/ydb/core/util/aws.cpp
+++ b/ydb/core/util/aws.cpp
@@ -1,7 +1,5 @@
#include "aws.h"
-#ifndef KIKIMR_DISABLE_S3_OPS
-
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/include/aws/core/internal/AWSHttpResourceClient.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/include/aws/core/Aws.h>
#include <contrib/libs/curl/include/curl/curl.h>
@@ -26,14 +24,3 @@ void ShutdownAwsAPI() {
}
} // NKikimr
-
-#else
-
-namespace NKikimr {
-
-void InitAwsAPI() {}
-void ShutdownAwsAPI() {}
-
-} // NKikimr
-
-#endif
diff --git a/ydb/core/util/aws_windows_stub.cpp b/ydb/core/util/aws_windows_stub.cpp
new file mode 100644
index 0000000000..ecefb946d3
--- /dev/null
+++ b/ydb/core/util/aws_windows_stub.cpp
@@ -0,0 +1,8 @@
+#include "aws.h"
+
+namespace NKikimr {
+
+void InitAwsAPI() {}
+void ShutdownAwsAPI() {}
+
+} // NKikimr
diff --git a/ydb/core/util/ya.make b/ydb/core/util/ya.make
index e2267ccadd..e7a1a31e2c 100644
--- a/ydb/core/util/ya.make
+++ b/ydb/core/util/ya.make
@@ -3,7 +3,6 @@ LIBRARY()
SRCS(
activeactors.h
address_classifier.cpp
- aws.cpp
backoff.cpp
cache.cpp
cache.h
@@ -84,11 +83,17 @@ IF (OS_WINDOWS)
CFLAGS(
-DKIKIMR_DISABLE_S3_OPS
)
+ SRCS(
+ aws_windows_stub.cpp
+ )
ELSE()
PEERDIR(
contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core
contrib/libs/curl
)
+ SRCS(
+ aws.cpp
+ )
ENDIF()
END()