aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-12-06 16:03:20 +0300
committerilnaz <ilnaz@ydb.tech>2023-12-06 17:12:02 +0300
commit658905d984103f8f38606af7e82fce0922e10eea (patch)
treec9ce6cafef3cb6da424a2a88b2c81961c30eefa9
parent1c561659d3a0f1fcc89a3cd615ee731f0e2bcf2c (diff)
downloadydb-658905d984103f8f38606af7e82fce0922e10eea.tar.gz
S3 proxy resolver: config & impl KIKIMR-16190
-rw-r--r--ydb/core/base/appdata.cpp2
-rw-r--r--ydb/core/base/appdata_fwd.h3
-rw-r--r--ydb/core/driver_lib/run/run.cpp4
-rw-r--r--ydb/core/protos/config.proto20
-rw-r--r--ydb/core/protos/console_config.proto1
-rw-r--r--ydb/core/testlib/actors/test_runtime.cpp1
-rw-r--r--ydb/core/testlib/basics/appdata.cpp1
-rw-r--r--ydb/core/testlib/basics/appdata.h1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt2
-rw-r--r--ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/tx/datashard/export_s3_base_uploader.h548
-rw-r--r--ydb/core/tx/datashard/export_s3_uploader.cpp647
-rw-r--r--ydb/core/tx/datashard/ya.make1
16 files changed, 683 insertions, 556 deletions
diff --git a/ydb/core/base/appdata.cpp b/ydb/core/base/appdata.cpp
index 90086cd905..8fb299a65d 100644
--- a/ydb/core/base/appdata.cpp
+++ b/ydb/core/base/appdata.cpp
@@ -74,6 +74,7 @@ TAppData::TAppData(
, DomainsConfigPtr(new NKikimrConfig::TDomainsConfig())
, BootstrapConfigPtr(new NKikimrConfig::TBootstrap())
, AwsCompatibilityConfigPtr(new NKikimrConfig::TAwsCompatibilityConfig())
+ , S3ProxyResolverConfigPtr(new NKikimrConfig::TS3ProxyResolverConfig())
, StreamingConfig(*StreamingConfigPtr.get())
, PQConfig(*PQConfigPtr.get())
, PQClusterDiscoveryConfig(*PQClusterDiscoveryConfigPtr.get())
@@ -94,6 +95,7 @@ TAppData::TAppData(
, DomainsConfig(*DomainsConfigPtr.get())
, BootstrapConfig(*BootstrapConfigPtr.get())
, AwsCompatibilityConfig(*AwsCompatibilityConfigPtr.get())
+ , S3ProxyResolverConfig(*S3ProxyResolverConfigPtr.get())
, KikimrShouldContinue(kikimrShouldContinue)
{}
diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h
index 2863ddd572..329183c5af 100644
--- a/ydb/core/base/appdata_fwd.h
+++ b/ydb/core/base/appdata_fwd.h
@@ -59,6 +59,7 @@ namespace NKikimrConfig {
class TDomainsConfig;
class TBootstrap;
class TAwsCompatibilityConfig;
+ class TS3ProxyResolverConfig;
}
namespace NKikimrNetClassifier {
@@ -201,6 +202,7 @@ struct TAppData {
std::unique_ptr<NKikimrConfig::TDomainsConfig> DomainsConfigPtr;
std::unique_ptr<NKikimrConfig::TBootstrap> BootstrapConfigPtr;
std::unique_ptr<NKikimrConfig::TAwsCompatibilityConfig> AwsCompatibilityConfigPtr;
+ std::unique_ptr<NKikimrConfig::TS3ProxyResolverConfig> S3ProxyResolverConfigPtr;
std::unique_ptr<NKikimrSharedCache::TSharedCacheConfig> SharedCacheConfigPtr;
NKikimrStream::TStreamingConfig& StreamingConfig;
@@ -223,6 +225,7 @@ struct TAppData {
NKikimrConfig::TDomainsConfig& DomainsConfig;
NKikimrConfig::TBootstrap& BootstrapConfig;
NKikimrConfig::TAwsCompatibilityConfig& AwsCompatibilityConfig;
+ NKikimrConfig::TS3ProxyResolverConfig& S3ProxyResolverConfig;
bool EnforceUserTokenRequirement = false;
bool AllowHugeKeyValueDeletes = true; // delete when all clients limit deletes per request
bool EnableKqpSpilling = false;
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index 300affda40..7476c90fcc 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -1093,6 +1093,10 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig)
AppData->AwsCompatibilityConfig = runConfig.AppConfig.GetAwsCompatibilityConfig();
}
+ if (runConfig.AppConfig.HasS3ProxyResolverConfig()) {
+ AppData->S3ProxyResolverConfig = runConfig.AppConfig.GetS3ProxyResolverConfig();
+ }
+
// setup resource profiles
AppData->ResourceProfiles = new TResourceProfiles;
if (runConfig.AppConfig.GetBootstrapConfig().ResourceProfilesSize())
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 5dc2e109b5..8cc3ff2f04 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -752,6 +752,25 @@ message THttpProxyConfig {
optional string JwtFile = 10;
}
+message TS3ProxyResolverConfig {
+ message THttpResolverConfig {
+ // resolve proxy host to connect to that endpoint using this url
+ optional string ResolveUrl = 1;
+ // use resolved proxy through these ports
+ optional uint32 HttpPort = 2;
+ optional uint32 HttpsPort = 3;
+ }
+
+ message TEndpoint {
+ // S3 endpoint
+ optional string Endpoint = 1;
+ oneof Resolver {
+ THttpResolverConfig HttpResolver = 2;
+ }
+ }
+
+ repeated TEndpoint Endpoints = 1;
+}
message TSqsConfig {
optional bool EnableSqs = 5;
@@ -1671,6 +1690,7 @@ message TAppConfig {
optional TQueryServiceConfig QueryServiceConfig = 73;
optional TConveyorConfig InsertConveyorConfig = 74;
optional bool AllowEditYamlInUi = 75;
+ optional TS3ProxyResolverConfig S3ProxyResolverConfig = 76;
repeated TNamedConfig NamedConfigs = 100;
optional string ClusterYamlConfig = 101;
diff --git a/ydb/core/protos/console_config.proto b/ydb/core/protos/console_config.proto
index 61daa66137..4e03297c7a 100644
--- a/ydb/core/protos/console_config.proto
+++ b/ydb/core/protos/console_config.proto
@@ -135,6 +135,7 @@ message TConfigItem {
QueryServiceConfigItem = 73;
InsertConveyorConfigItem = 74;
AllowEditYamlInUiItem = 75;
+ S3ProxyResolverConfigItem = 76;
NamedConfigsItem = 100;
ClusterYamlConfigItem = 101;
diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp
index 56407e24ef..effe935e59 100644
--- a/ydb/core/testlib/actors/test_runtime.cpp
+++ b/ydb/core/testlib/actors/test_runtime.cpp
@@ -150,6 +150,7 @@ namespace NActors {
nodeAppData->ColumnShardConfig = app0->ColumnShardConfig;
nodeAppData->MeteringConfig = app0->MeteringConfig;
nodeAppData->AwsCompatibilityConfig = app0->AwsCompatibilityConfig;
+ nodeAppData->S3ProxyResolverConfig = app0->S3ProxyResolverConfig;
nodeAppData->EnableMvccSnapshotWithLegacyDomainRoot = app0->EnableMvccSnapshotWithLegacyDomainRoot;
nodeAppData->IoContextFactory = app0->IoContextFactory;
if (KeyConfigGenerator) {
diff --git a/ydb/core/testlib/basics/appdata.cpp b/ydb/core/testlib/basics/appdata.cpp
index 1a0a0974b2..e7b5363d8d 100644
--- a/ydb/core/testlib/basics/appdata.cpp
+++ b/ydb/core/testlib/basics/appdata.cpp
@@ -57,6 +57,7 @@ namespace NKikimr {
app->SchemeShardConfig = SchemeShardConfig;
app->MeteringConfig = MeteringConfig;
app->AwsCompatibilityConfig = AwsCompatibilityConfig;
+ app->S3ProxyResolverConfig = S3ProxyResolverConfig;
app->FeatureFlags = FeatureFlags;
// This is a special setting active in test runtime only
diff --git a/ydb/core/testlib/basics/appdata.h b/ydb/core/testlib/basics/appdata.h
index b902ce89cb..a17e2b0ac4 100644
--- a/ydb/core/testlib/basics/appdata.h
+++ b/ydb/core/testlib/basics/appdata.h
@@ -96,6 +96,7 @@ namespace NKikimr {
NKikimrConfig::TMeteringConfig MeteringConfig;
NKikimrPQ::TPQConfig PQConfig;
NKikimrConfig::TAwsCompatibilityConfig AwsCompatibilityConfig;
+ NKikimrConfig::TS3ProxyResolverConfig S3ProxyResolverConfig;
private:
TAutoPtr<TMine> Mine;
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt
index 0159a00ee0..ca3bfea616 100644
--- a/ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt
@@ -108,6 +108,7 @@ target_link_libraries(core-tx-datashard PUBLIC
library-cpp-resource
contrib-libs-zstd
library-actors-core
+ library-actors-http
cpp-containers-absl_flat_hash
cpp-containers-stack_vector
cpp-digest-md5
@@ -376,6 +377,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
library-cpp-resource
contrib-libs-zstd
library-actors-core
+ library-actors-http
cpp-containers-absl_flat_hash
cpp-containers-stack_vector
cpp-digest-md5
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
index 0159a00ee0..ca3bfea616 100644
--- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
@@ -108,6 +108,7 @@ target_link_libraries(core-tx-datashard PUBLIC
library-cpp-resource
contrib-libs-zstd
library-actors-core
+ library-actors-http
cpp-containers-absl_flat_hash
cpp-containers-stack_vector
cpp-digest-md5
@@ -376,6 +377,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
library-cpp-resource
contrib-libs-zstd
library-actors-core
+ library-actors-http
cpp-containers-absl_flat_hash
cpp-containers-stack_vector
cpp-digest-md5
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
index 414a952f2b..83f3d65fee 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
@@ -109,6 +109,7 @@ target_link_libraries(core-tx-datashard PUBLIC
library-cpp-resource
contrib-libs-zstd
library-actors-core
+ library-actors-http
cpp-containers-absl_flat_hash
cpp-containers-stack_vector
cpp-digest-md5
@@ -378,6 +379,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
library-cpp-resource
contrib-libs-zstd
library-actors-core
+ library-actors-http
cpp-containers-absl_flat_hash
cpp-containers-stack_vector
cpp-digest-md5
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
index 414a952f2b..83f3d65fee 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
@@ -109,6 +109,7 @@ target_link_libraries(core-tx-datashard PUBLIC
library-cpp-resource
contrib-libs-zstd
library-actors-core
+ library-actors-http
cpp-containers-absl_flat_hash
cpp-containers-stack_vector
cpp-digest-md5
@@ -378,6 +379,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
library-cpp-resource
contrib-libs-zstd
library-actors-core
+ library-actors-http
cpp-containers-absl_flat_hash
cpp-containers-stack_vector
cpp-digest-md5
diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
index 15228d5d60..99d4810d8a 100644
--- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
@@ -109,6 +109,7 @@ target_link_libraries(core-tx-datashard PUBLIC
library-cpp-resource
contrib-libs-zstd
library-actors-core
+ library-actors-http
cpp-containers-absl_flat_hash
cpp-containers-stack_vector
cpp-digest-md5
@@ -373,6 +374,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
library-cpp-resource
contrib-libs-zstd
library-actors-core
+ library-actors-http
cpp-containers-absl_flat_hash
cpp-containers-stack_vector
cpp-digest-md5
diff --git a/ydb/core/tx/datashard/export_s3_base_uploader.h b/ydb/core/tx/datashard/export_s3_base_uploader.h
deleted file mode 100644
index c1e7cc8b08..0000000000
--- a/ydb/core/tx/datashard/export_s3_base_uploader.h
+++ /dev/null
@@ -1,548 +0,0 @@
-#pragma once
-#ifndef KIKIMR_DISABLE_S3_OPS
-
-#include "datashard.h"
-#include "export_common.h"
-#include "export_s3.h"
-#include "extstorage_usage_config.h"
-
-#include <ydb/core/base/appdata.h>
-#include <ydb/core/protos/flat_scheme_op.pb.h>
-#include <ydb/library/services/services.pb.h>
-#include <ydb/core/wrappers/s3_storage_config.h>
-#include <ydb/core/wrappers/s3_wrapper.h>
-#include <ydb/core/wrappers/events/common.h>
-#include <ydb/library/actors/core/actor_bootstrapped.h>
-#include <ydb/library/actors/core/hfunc.h>
-#include <library/cpp/random_provider/random_provider.h>
-
-#include <util/generic/buffer.h>
-#include <util/generic/maybe.h>
-#include <util/generic/ptr.h>
-#include <util/generic/string.h>
-#include <util/string/builder.h>
-#include <util/string/cast.h>
-
-#include <google/protobuf/text_format.h>
-
-namespace NKikimr {
-namespace NDataShard {
-
-class IProxyOps {
-public:
- virtual ~IProxyOps() = default;
- virtual bool NeedToResolveProxy() const = 0;
- virtual void ResolveProxy() = 0;
-
-}; // IProxyOps
-
-template <typename TDerived>
-class TS3UploaderBase: public TActorBootstrapped<TDerived>
- , public IProxyOps
-{
- using TEvExternalStorage = NWrappers::TEvExternalStorage;
- using TEvBuffer = TEvExportScan::TEvBuffer<TBuffer>;
-
-protected:
- void Restart() {
- Y_ABORT_UNLESS(ProxyResolved);
-
- MultiPart = false;
- Last = false;
- Parts.clear();
-
- if (Attempt) {
- this->Send(std::exchange(Client, TActorId()), new TEvents::TEvPoisonPill());
- }
-
- Client = this->RegisterWithSameMailbox(NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
-
- if (!MetadataUploaded) {
- UploadMetadata();
- } else if (!SchemeUploaded) {
- UploadScheme();
- } else {
- this->Become(&TDerived::StateUploadData);
-
- if (Attempt) {
- this->Send(std::exchange(Scanner, TActorId()), new TEvExportScan::TEvReset());
- } else if (Scanner) {
- this->Send(Scanner, new TEvExportScan::TEvFeed());
- }
- }
- }
-
- void UploadScheme() {
- Y_ABORT_UNLESS(!SchemeUploaded);
-
- if (!Scheme) {
- return Finish(false, "Cannot infer scheme");
- }
-
- google::protobuf::TextFormat::PrintToString(Scheme.GetRef(), &Buffer);
-
- auto request = Aws::S3::Model::PutObjectRequest()
- .WithKey(Settings.GetSchemeKey())
- .WithStorageClass(Settings.GetStorageClass());
- this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));
-
- this->Become(&TDerived::StateUploadScheme);
- }
-
- void UploadMetadata() {
- Y_ABORT_UNLESS(!MetadataUploaded);
-
- Buffer = std::move(Metadata);
-
- auto request = Aws::S3::Model::PutObjectRequest()
- .WithKey(Settings.GetMetadataKey())
- .WithStorageClass(Settings.GetStorageClass());
- this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));
-
- this->Become(&TDerived::StateUploadMetadata);
- }
-
- void HandleScheme(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
- const auto& result = ev->Get()->Result;
-
- EXPORT_LOG_D("HandleScheme TEvExternalStorage::TEvPutObjectResponse"
- << ": self# " << this->SelfId()
- << ", result# " << result);
-
- if (!CheckResult(result, TStringBuf("PutObject (scheme)"))) {
- return;
- }
-
- SchemeUploaded = true;
-
- if (Scanner) {
- this->Send(Scanner, new TEvExportScan::TEvFeed());
- }
-
- this->Become(&TDerived::StateUploadData);
- }
-
- void HandleMetadata(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
- const auto& result = ev->Get()->Result;
-
- EXPORT_LOG_D("HandleMetadata TEvExternalStorage::TEvPutObjectResponse"
- << ": self# " << this->SelfId()
- << ", result# " << result);
-
- if (!CheckResult(result, TStringBuf("PutObject (metadata)"))) {
- return;
- }
-
- MetadataUploaded = true;
-
- UploadScheme();
- }
-
- void Handle(TEvExportScan::TEvReady::TPtr& ev) {
- EXPORT_LOG_D("Handle TEvExportScan::TEvReady"
- << ": self# " << this->SelfId()
- << ", sender# " << ev->Sender);
-
- Scanner = ev->Sender;
-
- if (Error) {
- return PassAway();
- }
-
- if (ProxyResolved && SchemeUploaded && MetadataUploaded) {
- this->Send(Scanner, new TEvExportScan::TEvFeed());
- }
- }
-
- void Handle(TEvBuffer::TPtr& ev) {
- EXPORT_LOG_D("Handle TEvExportScan::TEvBuffer"
- << ": self# " << this->SelfId()
- << ", sender# " << ev->Sender
- << ", msg# " << ev->Get()->ToString());
-
- if (ev->Sender != Scanner) {
- EXPORT_LOG_W("Received buffer from unknown scanner"
- << ": self# " << this->SelfId()
- << ", sender# " << ev->Sender
- << ", scanner# " << Scanner);
- return;
- }
-
- Last = ev->Get()->Last;
- MultiPart = MultiPart || !Last;
- ev->Get()->Buffer.AsString(Buffer);
-
- UploadData();
- }
-
- void UploadData() {
- if (!MultiPart) {
- auto request = Aws::S3::Model::PutObjectRequest()
- .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
- .WithStorageClass(Settings.GetStorageClass());
- this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));
- } else {
- if (!UploadId) {
- this->Send(DataShard, new TEvDataShard::TEvGetS3Upload(this->SelfId(), TxId));
- return;
- }
-
- auto request = Aws::S3::Model::UploadPartRequest()
- .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
- .WithUploadId(*UploadId)
- .WithPartNumber(Parts.size() + 1);
- this->Send(Client, new TEvExternalStorage::TEvUploadPartRequest(request, std::move(Buffer)));
- }
- }
-
- void HandleData(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
- const auto& result = ev->Get()->Result;
-
- EXPORT_LOG_D("HandleData TEvExternalStorage::TEvPutObjectResponse"
- << ": self# " << this->SelfId()
- << ", result# " << result);
-
- if (!CheckResult(result, TStringBuf("PutObject (data)"))) {
- return;
- }
-
- Finish();
- }
-
- void Handle(TEvDataShard::TEvS3Upload::TPtr& ev) {
- auto& upload = ev->Get()->Upload;
-
- EXPORT_LOG_D("Handle TEvDataShard::TEvS3Upload"
- << ": self# " << this->SelfId()
- << ", upload# " << upload);
-
- if (!upload) {
- auto request = Aws::S3::Model::CreateMultipartUploadRequest()
- .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
- .WithStorageClass(Settings.GetStorageClass());
- this->Send(Client, new TEvExternalStorage::TEvCreateMultipartUploadRequest(request));
- } else {
- UploadId = upload->Id;
-
- switch (upload->Status) {
- case TS3Upload::EStatus::UploadParts:
- return UploadData();
-
- case TS3Upload::EStatus::Complete: {
- Parts = std::move(upload->Parts);
-
- TVector<Aws::S3::Model::CompletedPart> parts(Reserve(Parts.size()));
- for (ui32 partIndex = 0; partIndex < Parts.size(); ++partIndex) {
- parts.emplace_back(Aws::S3::Model::CompletedPart()
- .WithPartNumber(partIndex + 1)
- .WithETag(Parts.at(partIndex)));
- }
-
- auto request = Aws::S3::Model::CompleteMultipartUploadRequest()
- .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
- .WithUploadId(*UploadId)
- .WithMultipartUpload(Aws::S3::Model::CompletedMultipartUpload().WithParts(std::move(parts)));
- this->Send(Client, new TEvExternalStorage::TEvCompleteMultipartUploadRequest(request));
- break;
- }
-
- case TS3Upload::EStatus::Abort: {
- Error = std::move(upload->Error);
- if (!Error) {
- Error = "<empty>";
- }
-
- auto request = Aws::S3::Model::AbortMultipartUploadRequest()
- .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
- .WithUploadId(*UploadId);
- this->Send(Client, new TEvExternalStorage::TEvAbortMultipartUploadRequest(request));
- break;
- }
- }
- }
- }
-
- void Handle(TEvExternalStorage::TEvCreateMultipartUploadResponse::TPtr& ev) {
- const auto& result = ev->Get()->Result;
-
- EXPORT_LOG_D("Handle TEvExternalStorage::TEvCreateMultipartUploadResponse"
- << ": self# " << this->SelfId()
- << ", result# " << result);
-
- if (!CheckResult(result, TStringBuf("CreateMultipartUpload"))) {
- return;
- }
-
- this->Send(DataShard, new TEvDataShard::TEvStoreS3UploadId(this->SelfId(), TxId, result.GetResult().GetUploadId().c_str()));
- }
-
- void Handle(TEvExternalStorage::TEvUploadPartResponse::TPtr& ev) {
- const auto& result = ev->Get()->Result;
-
- EXPORT_LOG_D("Handle TEvExternalStorage::TEvUploadPartResponse"
- << ": self# " << this->SelfId()
- << ", result# " << result);
-
- if (!CheckResult(result, TStringBuf("UploadPart"))) {
- return;
- }
-
- Parts.push_back(result.GetResult().GetETag().c_str());
-
- if (Last) {
- return Finish();
- }
-
- this->Send(Scanner, new TEvExportScan::TEvFeed());
- }
-
- void Handle(TEvExternalStorage::TEvCompleteMultipartUploadResponse::TPtr& ev) {
- const auto& result = ev->Get()->Result;
-
- EXPORT_LOG_D("Handle TEvExternalStorage::TEvCompleteMultipartUploadResponse"
- << ": self# " << this->SelfId()
- << ", result# " << result);
-
- if (result.IsSuccess()) {
- return PassAway();
- }
-
- const auto& error = result.GetError();
- if (error.GetErrorType() == Aws::S3::S3Errors::NO_SUCH_UPLOAD) {
- return PassAway();
- }
-
- if (CanRetry(error)) {
- UploadId.Clear(); // force getting info after restart
- Retry();
- } else {
- Error = error.GetMessage().c_str();
- PassAway();
- }
- }
-
- void Handle(TEvExternalStorage::TEvAbortMultipartUploadResponse::TPtr& ev) {
- const auto& result = ev->Get()->Result;
-
- EXPORT_LOG_D("Handle TEvExternalStorage::TEvAbortMultipartUploadResponse"
- << ": self# " << this->SelfId()
- << ", result# " << result);
-
- if (result.IsSuccess()) {
- return PassAway();
- }
-
- const auto& error = result.GetError();
- if (CanRetry(error)) {
- UploadId.Clear(); // force getting info after restart
- Retry();
- } else {
- Y_ABORT_UNLESS(Error);
- Error = TStringBuilder() << *Error << " Additionally, 'AbortMultipartUpload' has failed: "
- << error.GetMessage();
- PassAway();
- }
- }
-
- template <typename TResult>
- bool CheckResult(const TResult& result, const TStringBuf marker) {
- if (result.IsSuccess()) {
- return true;
- }
-
- EXPORT_LOG_E("Error at '" << marker << "'"
- << ": self# " << this->SelfId()
- << ", error# " << result);
- RetryOrFinish(result.GetError());
-
- return false;
- }
-
- static bool ShouldRetry(const Aws::S3::S3Error& error) {
- if (error.ShouldRetry()) {
- return true;
- }
-
- if ("TooManyRequests" == error.GetExceptionName()) {
- return true;
- }
-
- return false;
- }
-
- bool CanRetry(const Aws::S3::S3Error& error) const {
- return Attempt < Retries && ShouldRetry(error);
- }
-
- void Retry() {
- Delay = Min(Delay * ++Attempt, TDuration::Minutes(10));
- const TDuration random = TDuration::FromValue(TAppData::RandomProvider->GenRand64() % Delay.MicroSeconds());
- this->Schedule(Delay + random, new TEvents::TEvWakeup());
- }
-
- void RetryOrFinish(const Aws::S3::S3Error& error) {
- if (CanRetry(error)) {
- Retry();
- } else {
- Finish(false, TStringBuilder() << "S3 error: " << error.GetMessage().c_str());
- }
- }
-
- void Finish(bool success = true, const TString& error = TString()) {
- EXPORT_LOG_I("Finish"
- << ": self# " << this->SelfId()
- << ", success# " << success
- << ", error# " << error
- << ", multipart# " << MultiPart
- << ", uploadId# " << UploadId);
-
- if (!success) {
- Error = error;
- }
-
- if (!MultiPart || !UploadId) {
- if (!Scanner) {
- return;
- }
-
- PassAway();
- } else {
- if (success) {
- this->Send(DataShard, new TEvDataShard::TEvChangeS3UploadStatus(this->SelfId(), TxId,
- TS3Upload::EStatus::Complete, std::move(Parts)));
- } else {
- this->Send(DataShard, new TEvDataShard::TEvChangeS3UploadStatus(this->SelfId(), TxId,
- TS3Upload::EStatus::Abort, *Error));
- }
- }
- }
-
- void PassAway() override {
- if (Scanner) {
- this->Send(Scanner, new TEvExportScan::TEvFinish(Error.Empty(), Error.GetOrElse(TString())));
- }
-
- this->Send(Client, new TEvents::TEvPoisonPill());
-
- IActor::PassAway();
- }
-
-public:
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::EXPORT_S3_UPLOADER_ACTOR;
- }
-
- static constexpr TStringBuf LogPrefix() {
- return "s3"sv;
- }
-
- explicit TS3UploaderBase(
- const TActorId& dataShard, ui64 txId,
- const NKikimrSchemeOp::TBackupTask& task,
- TMaybe<Ydb::Table::CreateTableRequest>&& scheme,
- TString&& metadata)
- : ExternalStorageConfig(new NWrappers::NExternalStorage::TS3ExternalStorageConfig(task.GetS3Settings()))
- , Settings(TS3Settings::FromBackupTask(task))
- , DataFormat(NBackupRestoreTraits::EDataFormat::Csv)
- , CompressionCodec(NBackupRestoreTraits::CodecFromTask(task))
- , DataShard(dataShard)
- , TxId(txId)
- , Scheme(std::move(scheme))
- , Metadata(std::move(metadata))
- , Retries(task.GetNumberOfRetries())
- , Attempt(0)
- , Delay(TDuration::Minutes(1))
- , SchemeUploaded(task.GetShardNum() == 0 ? false : true)
- , MetadataUploaded(task.GetShardNum() == 0 ? false : true)
- {
- }
-
- void Bootstrap() {
- EXPORT_LOG_D("Bootstrap"
- << ": self# " << this->SelfId()
- << ", attempt# " << Attempt);
-
- ProxyResolved = !NeedToResolveProxy();
- if (!ProxyResolved) {
- ResolveProxy();
- } else {
- Restart();
- }
- }
-
- STATEFN(StateBase) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvExportScan::TEvReady, Handle);
-
- sFunc(TEvents::TEvWakeup, Bootstrap);
- sFunc(TEvents::TEvPoisonPill, PassAway);
- }
- }
-
- STATEFN(StateUploadScheme) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleScheme);
- default:
- return StateBase(ev);
- }
- }
-
- STATEFN(StateUploadMetadata) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleMetadata);
- default:
- return StateBase(ev);
- }
- }
-
- STATEFN(StateUploadData) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvBuffer, Handle);
- hFunc(TEvDataShard::TEvS3Upload, Handle);
-
- hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleData);
- hFunc(TEvExternalStorage::TEvCreateMultipartUploadResponse, Handle);
- hFunc(TEvExternalStorage::TEvUploadPartResponse, Handle);
- hFunc(TEvExternalStorage::TEvCompleteMultipartUploadResponse, Handle);
- hFunc(TEvExternalStorage::TEvAbortMultipartUploadResponse, Handle);
- default:
- return StateBase(ev);
- }
- }
-
-protected:
- NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig;
- TS3Settings Settings;
- const NBackupRestoreTraits::EDataFormat DataFormat;
- const NBackupRestoreTraits::ECompressionCodec CompressionCodec;
- bool ProxyResolved;
-
-private:
- const TActorId DataShard;
- const ui64 TxId;
- const TMaybe<Ydb::Table::CreateTableRequest> Scheme;
- const TString Metadata;
-
- const ui32 Retries;
- ui32 Attempt;
-
- TActorId Client;
- TDuration Delay;
- bool SchemeUploaded;
- bool MetadataUploaded;
- bool MultiPart;
- bool Last;
-
- TActorId Scanner;
- TString Buffer;
-
- TMaybe<TString> UploadId;
- TVector<TString> Parts;
- TMaybe<TString> Error;
-
-}; // TS3UploaderBase
-
-} // NDataShard
-} // NKikimr
-
-#endif // KIKIMR_DISABLE_S3_OPS
diff --git a/ydb/core/tx/datashard/export_s3_uploader.cpp b/ydb/core/tx/datashard/export_s3_uploader.cpp
index 0ff72f3f04..9419cf2c9b 100644
--- a/ydb/core/tx/datashard/export_s3_uploader.cpp
+++ b/ydb/core/tx/datashard/export_s3_uploader.cpp
@@ -1,24 +1,655 @@
#ifndef KIKIMR_DISABLE_S3_OPS
-#include "export_s3_base_uploader.h"
-
#include "backup_restore_common.h"
+#include "datashard.h"
+#include "export_common.h"
+#include "export_s3.h"
+#include "extstorage_usage_config.h"
+
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/library/services/services.pb.h>
+#include <ydb/core/wrappers/s3_storage_config.h>
+#include <ydb/core/wrappers/s3_wrapper.h>
+#include <ydb/core/wrappers/events/common.h>
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+#include <ydb/library/actors/core/hfunc.h>
+#include <ydb/library/actors/http/http_proxy.h>
+#include <library/cpp/random_provider/random_provider.h>
+
+#include <util/generic/buffer.h>
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/string/builder.h>
+#include <util/string/cast.h>
+
+#include <google/protobuf/text_format.h>
namespace NKikimr {
namespace NDataShard {
-class TS3Uploader: public TS3UploaderBase<TS3Uploader> {
-protected:
- bool NeedToResolveProxy() const override {
+class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
+ using TS3ExternalStorageConfig = NWrappers::NExternalStorage::TS3ExternalStorageConfig;
+ using THttpResolverConfig = NKikimrConfig::TS3ProxyResolverConfig::THttpResolverConfig;
+ using TEvExternalStorage = NWrappers::TEvExternalStorage;
+ using TEvBuffer = TEvExportScan::TEvBuffer<TBuffer>;
+
+ static TMaybe<THttpResolverConfig> GetHttpResolverConfig(TStringBuf endpoint) {
+ for (const auto& entry : AppData()->S3ProxyResolverConfig.GetEndpoints()) {
+ if (entry.GetEndpoint() == endpoint && entry.HasHttpResolver()) {
+ return entry.GetHttpResolver();
+ }
+ }
+
+ return Nothing();
+ }
+
+ static TStringBuf NormalizeEndpoint(TStringBuf endpoint) {
+ Y_UNUSED(endpoint.SkipPrefix("http://") || endpoint.SkipPrefix("https://"));
+ Y_UNUSED(endpoint.ChopSuffix(":80") || endpoint.ChopSuffix(":443"));
+ return endpoint;
+ }
+
+ static TMaybe<THttpResolverConfig> GetHttpResolverConfig(const TS3ExternalStorageConfig& settings) {
+ return GetHttpResolverConfig(NormalizeEndpoint(settings.GetConfig().endpointOverride));
+ }
+
+ std::shared_ptr<TS3ExternalStorageConfig> GetS3StorageConfig() const {
+ return std::dynamic_pointer_cast<TS3ExternalStorageConfig>(ExternalStorageConfig);
+ }
+
+ TString GetResolveProxyUrl(const TS3ExternalStorageConfig& settings) const {
+ Y_ABORT_UNLESS(HttpResolverConfig);
+
+ TStringBuilder url;
+ switch (settings.GetConfig().scheme) {
+ case Aws::Http::Scheme::HTTP:
+ url << "http://";
+ break;
+ case Aws::Http::Scheme::HTTPS:
+ url << "https://";
+ break;
+ }
+
+ url << HttpResolverConfig->GetResolveUrl();
+ return url;
+ }
+
+ void ApplyProxy(TS3ExternalStorageConfig& settings, const TString& proxyHost) const {
+ Y_ABORT_UNLESS(HttpResolverConfig);
+
+ settings.ConfigRef().proxyScheme = settings.GetConfig().scheme;
+ settings.ConfigRef().proxyHost = proxyHost;
+ settings.ConfigRef().proxyCaPath = settings.GetConfig().caPath;
+
+ switch (settings.GetConfig().proxyScheme) {
+ case Aws::Http::Scheme::HTTP:
+ settings.ConfigRef().proxyPort = HttpResolverConfig->GetHttpPort();
+ break;
+ case Aws::Http::Scheme::HTTPS:
+ settings.ConfigRef().proxyPort = HttpResolverConfig->GetHttpsPort();
+ break;
+ }
+ }
+
+ void ResolveProxy() {
+ if (!HttpProxy) {
+ HttpProxy = Register(NHttp::CreateHttpProxy(NMonitoring::TMetricRegistry::SharedInstance()));
+ }
+
+ Send(HttpProxy, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(
+ NHttp::THttpOutgoingRequest::CreateRequestGet(GetResolveProxyUrl(*GetS3StorageConfig())),
+ TDuration::Seconds(10)
+ ));
+
+ Become(&TThis::StateResolveProxy);
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr& ev) {
+ const auto& msg = *ev->Get();
+
+ EXPORT_LOG_D("Handle NHttp::TEvHttpProxy::TEvHttpIncomingResponse"
+ << ": self# " << SelfId()
+ << ", status# " << (msg.Response ? msg.Response->Status : "null")
+ << ", body# " << (msg.Response ? msg.Response->Body : "null"));
+
+ if (!msg.Response || !msg.Response->Status.StartsWith("200")) {
+ EXPORT_LOG_E("Error at 'GetProxy'"
+ << ": self# " << SelfId()
+ << ", error# " << msg.GetError());
+ return RetryOrFinish(Aws::S3::S3Error({Aws::S3::S3Errors::SERVICE_UNAVAILABLE, true}));
+ }
+
+ if (msg.Response->Body.find('<') != TStringBuf::npos) {
+ EXPORT_LOG_E("Error at 'GetProxy'"
+ << ": self# " << SelfId()
+ << ", error# " << "invalid body"
+ << ", body# " << msg.Response->Body);
+ return RetryOrFinish(Aws::S3::S3Error({Aws::S3::S3Errors::SERVICE_UNAVAILABLE, true}));
+ }
+
+ ApplyProxy(*GetS3StorageConfig(), TString(msg.Response->Body));
+ ProxyResolved = true;
+
+ const auto& cfg = GetS3StorageConfig()->GetConfig();
+ EXPORT_LOG_N("Using proxy: "
+ << (cfg.proxyScheme == Aws::Http::Scheme::HTTPS ? "https://" : "http://")
+ << cfg.proxyHost << ":" << cfg.proxyPort);
+
+ Restart();
+ }
+
+ void Restart() {
+ Y_ABORT_UNLESS(ProxyResolved);
+
+ MultiPart = false;
+ Last = false;
+ Parts.clear();
+
+ if (Attempt) {
+ this->Send(std::exchange(Client, TActorId()), new TEvents::TEvPoisonPill());
+ }
+
+ Client = this->RegisterWithSameMailbox(NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
+
+ if (!MetadataUploaded) {
+ UploadMetadata();
+ } else if (!SchemeUploaded) {
+ UploadScheme();
+ } else {
+ this->Become(&TThis::StateUploadData);
+
+ if (Attempt) {
+ this->Send(std::exchange(Scanner, TActorId()), new TEvExportScan::TEvReset());
+ } else if (Scanner) {
+ this->Send(Scanner, new TEvExportScan::TEvFeed());
+ }
+ }
+ }
+
+ void UploadScheme() {
+ Y_ABORT_UNLESS(!SchemeUploaded);
+
+ if (!Scheme) {
+ return Finish(false, "Cannot infer scheme");
+ }
+
+ google::protobuf::TextFormat::PrintToString(Scheme.GetRef(), &Buffer);
+
+ auto request = Aws::S3::Model::PutObjectRequest()
+ .WithKey(Settings.GetSchemeKey())
+ .WithStorageClass(Settings.GetStorageClass());
+ this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));
+
+ this->Become(&TThis::StateUploadScheme);
+ }
+
+ void UploadMetadata() {
+ Y_ABORT_UNLESS(!MetadataUploaded);
+
+ Buffer = std::move(Metadata);
+
+ auto request = Aws::S3::Model::PutObjectRequest()
+ .WithKey(Settings.GetMetadataKey())
+ .WithStorageClass(Settings.GetStorageClass());
+ this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));
+
+ this->Become(&TThis::StateUploadMetadata);
+ }
+
+ void HandleScheme(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
+ const auto& result = ev->Get()->Result;
+
+ EXPORT_LOG_D("HandleScheme TEvExternalStorage::TEvPutObjectResponse"
+ << ": self# " << this->SelfId()
+ << ", result# " << result);
+
+ if (!CheckResult(result, TStringBuf("PutObject (scheme)"))) {
+ return;
+ }
+
+ SchemeUploaded = true;
+
+ if (Scanner) {
+ this->Send(Scanner, new TEvExportScan::TEvFeed());
+ }
+
+ this->Become(&TThis::StateUploadData);
+ }
+
+ void HandleMetadata(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
+ const auto& result = ev->Get()->Result;
+
+ EXPORT_LOG_D("HandleMetadata TEvExternalStorage::TEvPutObjectResponse"
+ << ": self# " << this->SelfId()
+ << ", result# " << result);
+
+ if (!CheckResult(result, TStringBuf("PutObject (metadata)"))) {
+ return;
+ }
+
+ MetadataUploaded = true;
+
+ UploadScheme();
+ }
+
+ void Handle(TEvExportScan::TEvReady::TPtr& ev) {
+ EXPORT_LOG_D("Handle TEvExportScan::TEvReady"
+ << ": self# " << this->SelfId()
+ << ", sender# " << ev->Sender);
+
+ Scanner = ev->Sender;
+
+ if (Error) {
+ return PassAway();
+ }
+
+ if (ProxyResolved && SchemeUploaded && MetadataUploaded) {
+ this->Send(Scanner, new TEvExportScan::TEvFeed());
+ }
+ }
+
+ void Handle(TEvBuffer::TPtr& ev) {
+ EXPORT_LOG_D("Handle TEvExportScan::TEvBuffer"
+ << ": self# " << this->SelfId()
+ << ", sender# " << ev->Sender
+ << ", msg# " << ev->Get()->ToString());
+
+ if (ev->Sender != Scanner) {
+ EXPORT_LOG_W("Received buffer from unknown scanner"
+ << ": self# " << this->SelfId()
+ << ", sender# " << ev->Sender
+ << ", scanner# " << Scanner);
+ return;
+ }
+
+ Last = ev->Get()->Last;
+ MultiPart = MultiPart || !Last;
+ ev->Get()->Buffer.AsString(Buffer);
+
+ UploadData();
+ }
+
+ void UploadData() {
+ if (!MultiPart) {
+ auto request = Aws::S3::Model::PutObjectRequest()
+ .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
+ .WithStorageClass(Settings.GetStorageClass());
+ this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));
+ } else {
+ if (!UploadId) {
+ this->Send(DataShard, new TEvDataShard::TEvGetS3Upload(this->SelfId(), TxId));
+ return;
+ }
+
+ auto request = Aws::S3::Model::UploadPartRequest()
+ .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
+ .WithUploadId(*UploadId)
+ .WithPartNumber(Parts.size() + 1);
+ this->Send(Client, new TEvExternalStorage::TEvUploadPartRequest(request, std::move(Buffer)));
+ }
+ }
+
+ void HandleData(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
+ const auto& result = ev->Get()->Result;
+
+ EXPORT_LOG_D("HandleData TEvExternalStorage::TEvPutObjectResponse"
+ << ": self# " << this->SelfId()
+ << ", result# " << result);
+
+ if (!CheckResult(result, TStringBuf("PutObject (data)"))) {
+ return;
+ }
+
+ Finish();
+ }
+
+ void Handle(TEvDataShard::TEvS3Upload::TPtr& ev) {
+ auto& upload = ev->Get()->Upload;
+
+ EXPORT_LOG_D("Handle TEvDataShard::TEvS3Upload"
+ << ": self# " << this->SelfId()
+ << ", upload# " << upload);
+
+ if (!upload) {
+ auto request = Aws::S3::Model::CreateMultipartUploadRequest()
+ .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
+ .WithStorageClass(Settings.GetStorageClass());
+ this->Send(Client, new TEvExternalStorage::TEvCreateMultipartUploadRequest(request));
+ } else {
+ UploadId = upload->Id;
+
+ switch (upload->Status) {
+ case TS3Upload::EStatus::UploadParts:
+ return UploadData();
+
+ case TS3Upload::EStatus::Complete: {
+ Parts = std::move(upload->Parts);
+
+ TVector<Aws::S3::Model::CompletedPart> parts(Reserve(Parts.size()));
+ for (ui32 partIndex = 0; partIndex < Parts.size(); ++partIndex) {
+ parts.emplace_back(Aws::S3::Model::CompletedPart()
+ .WithPartNumber(partIndex + 1)
+ .WithETag(Parts.at(partIndex)));
+ }
+
+ auto request = Aws::S3::Model::CompleteMultipartUploadRequest()
+ .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
+ .WithUploadId(*UploadId)
+ .WithMultipartUpload(Aws::S3::Model::CompletedMultipartUpload().WithParts(std::move(parts)));
+ this->Send(Client, new TEvExternalStorage::TEvCompleteMultipartUploadRequest(request));
+ break;
+ }
+
+ case TS3Upload::EStatus::Abort: {
+ Error = std::move(upload->Error);
+ if (!Error) {
+ Error = "<empty>";
+ }
+
+ auto request = Aws::S3::Model::AbortMultipartUploadRequest()
+ .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
+ .WithUploadId(*UploadId);
+ this->Send(Client, new TEvExternalStorage::TEvAbortMultipartUploadRequest(request));
+ break;
+ }
+ }
+ }
+ }
+
+ void Handle(TEvExternalStorage::TEvCreateMultipartUploadResponse::TPtr& ev) {
+ const auto& result = ev->Get()->Result;
+
+ EXPORT_LOG_D("Handle TEvExternalStorage::TEvCreateMultipartUploadResponse"
+ << ": self# " << this->SelfId()
+ << ", result# " << result);
+
+ if (!CheckResult(result, TStringBuf("CreateMultipartUpload"))) {
+ return;
+ }
+
+ this->Send(DataShard, new TEvDataShard::TEvStoreS3UploadId(this->SelfId(), TxId, result.GetResult().GetUploadId().c_str()));
+ }
+
+ void Handle(TEvExternalStorage::TEvUploadPartResponse::TPtr& ev) {
+ const auto& result = ev->Get()->Result;
+
+ EXPORT_LOG_D("Handle TEvExternalStorage::TEvUploadPartResponse"
+ << ": self# " << this->SelfId()
+ << ", result# " << result);
+
+ if (!CheckResult(result, TStringBuf("UploadPart"))) {
+ return;
+ }
+
+ Parts.push_back(result.GetResult().GetETag().c_str());
+
+ if (Last) {
+ return Finish();
+ }
+
+ this->Send(Scanner, new TEvExportScan::TEvFeed());
+ }
+
+ void Handle(TEvExternalStorage::TEvCompleteMultipartUploadResponse::TPtr& ev) {
+ const auto& result = ev->Get()->Result;
+
+ EXPORT_LOG_D("Handle TEvExternalStorage::TEvCompleteMultipartUploadResponse"
+ << ": self# " << this->SelfId()
+ << ", result# " << result);
+
+ if (result.IsSuccess()) {
+ return PassAway();
+ }
+
+ const auto& error = result.GetError();
+ if (error.GetErrorType() == Aws::S3::S3Errors::NO_SUCH_UPLOAD) {
+ return PassAway();
+ }
+
+ if (CanRetry(error)) {
+ UploadId.Clear(); // force getting info after restart
+ Retry();
+ } else {
+ Error = error.GetMessage().c_str();
+ PassAway();
+ }
+ }
+
+ void Handle(TEvExternalStorage::TEvAbortMultipartUploadResponse::TPtr& ev) {
+ const auto& result = ev->Get()->Result;
+
+ EXPORT_LOG_D("Handle TEvExternalStorage::TEvAbortMultipartUploadResponse"
+ << ": self# " << this->SelfId()
+ << ", result# " << result);
+
+ if (result.IsSuccess()) {
+ return PassAway();
+ }
+
+ const auto& error = result.GetError();
+ if (CanRetry(error)) {
+ UploadId.Clear(); // force getting info after restart
+ Retry();
+ } else {
+ Y_ABORT_UNLESS(Error);
+ Error = TStringBuilder() << *Error << " Additionally, 'AbortMultipartUpload' has failed: "
+ << error.GetMessage();
+ PassAway();
+ }
+ }
+
+ template <typename TResult>
+ bool CheckResult(const TResult& result, const TStringBuf marker) {
+ if (result.IsSuccess()) {
+ return true;
+ }
+
+ EXPORT_LOG_E("Error at '" << marker << "'"
+ << ": self# " << this->SelfId()
+ << ", error# " << result);
+ RetryOrFinish(result.GetError());
+
+ return false;
+ }
+
+ static bool ShouldRetry(const Aws::S3::S3Error& error) {
+ if (error.ShouldRetry()) {
+ return true;
+ }
+
+ if ("TooManyRequests" == error.GetExceptionName()) {
+ return true;
+ }
+
return false;
}
- void ResolveProxy() override {
- Y_ABORT("unreachable");
+ bool CanRetry(const Aws::S3::S3Error& error) const {
+ return Attempt < Retries && ShouldRetry(error);
+ }
+
+ void Retry() {
+ Delay = Min(Delay * ++Attempt, TDuration::Minutes(10));
+ const TDuration random = TDuration::FromValue(TAppData::RandomProvider->GenRand64() % Delay.MicroSeconds());
+ this->Schedule(Delay + random, new TEvents::TEvWakeup());
+ }
+
+ void RetryOrFinish(const Aws::S3::S3Error& error) {
+ if (CanRetry(error)) {
+ Retry();
+ } else {
+ Finish(false, TStringBuilder() << "S3 error: " << error.GetMessage().c_str());
+ }
+ }
+
+ void Finish(bool success = true, const TString& error = TString()) {
+ EXPORT_LOG_I("Finish"
+ << ": self# " << this->SelfId()
+ << ", success# " << success
+ << ", error# " << error
+ << ", multipart# " << MultiPart
+ << ", uploadId# " << UploadId);
+
+ if (!success) {
+ Error = error;
+ }
+
+ if (!MultiPart || !UploadId) {
+ if (!Scanner) {
+ return;
+ }
+
+ PassAway();
+ } else {
+ if (success) {
+ this->Send(DataShard, new TEvDataShard::TEvChangeS3UploadStatus(this->SelfId(), TxId,
+ TS3Upload::EStatus::Complete, std::move(Parts)));
+ } else {
+ this->Send(DataShard, new TEvDataShard::TEvChangeS3UploadStatus(this->SelfId(), TxId,
+ TS3Upload::EStatus::Abort, *Error));
+ }
+ }
+ }
+
+ void PassAway() override {
+ if (HttpProxy) {
+ Send(HttpProxy, new TEvents::TEvPoisonPill());
+ }
+
+ if (Scanner) {
+ this->Send(Scanner, new TEvExportScan::TEvFinish(Error.Empty(), Error.GetOrElse(TString())));
+ }
+
+ this->Send(Client, new TEvents::TEvPoisonPill());
+
+ IActor::PassAway();
}
public:
- using TS3UploaderBase::TS3UploaderBase;
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::EXPORT_S3_UPLOADER_ACTOR;
+ }
+
+ static constexpr TStringBuf LogPrefix() {
+ return "s3"sv;
+ }
+
+ explicit TS3Uploader(
+ const TActorId& dataShard, ui64 txId,
+ const NKikimrSchemeOp::TBackupTask& task,
+ TMaybe<Ydb::Table::CreateTableRequest>&& scheme,
+ TString&& metadata)
+ : ExternalStorageConfig(new TS3ExternalStorageConfig(task.GetS3Settings()))
+ , Settings(TS3Settings::FromBackupTask(task))
+ , DataFormat(NBackupRestoreTraits::EDataFormat::Csv)
+ , CompressionCodec(NBackupRestoreTraits::CodecFromTask(task))
+ , HttpResolverConfig(GetHttpResolverConfig(*GetS3StorageConfig()))
+ , DataShard(dataShard)
+ , TxId(txId)
+ , Scheme(std::move(scheme))
+ , Metadata(std::move(metadata))
+ , Retries(task.GetNumberOfRetries())
+ , Attempt(0)
+ , Delay(TDuration::Minutes(1))
+ , SchemeUploaded(task.GetShardNum() == 0 ? false : true)
+ , MetadataUploaded(task.GetShardNum() == 0 ? false : true)
+ {
+ }
+
+ void Bootstrap() {
+ EXPORT_LOG_D("Bootstrap"
+ << ": self# " << this->SelfId()
+ << ", attempt# " << Attempt);
+
+ ProxyResolved = !HttpResolverConfig.Defined();
+ if (!ProxyResolved) {
+ ResolveProxy();
+ } else {
+ Restart();
+ }
+ }
+
+ STATEFN(StateBase) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvExportScan::TEvReady, Handle);
+
+ sFunc(TEvents::TEvWakeup, Bootstrap);
+ sFunc(TEvents::TEvPoisonPill, PassAway);
+ }
+ }
+
+ STATEFN(StateResolveProxy) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle);
+ default:
+ return StateBase(ev);
+ }
+ }
+
+ STATEFN(StateUploadScheme) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleScheme);
+ default:
+ return StateBase(ev);
+ }
+ }
+
+ STATEFN(StateUploadMetadata) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleMetadata);
+ default:
+ return StateBase(ev);
+ }
+ }
+
+ STATEFN(StateUploadData) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvBuffer, Handle);
+ hFunc(TEvDataShard::TEvS3Upload, Handle);
+
+ hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleData);
+ hFunc(TEvExternalStorage::TEvCreateMultipartUploadResponse, Handle);
+ hFunc(TEvExternalStorage::TEvUploadPartResponse, Handle);
+ hFunc(TEvExternalStorage::TEvCompleteMultipartUploadResponse, Handle);
+ hFunc(TEvExternalStorage::TEvAbortMultipartUploadResponse, Handle);
+ default:
+ return StateBase(ev);
+ }
+ }
+
+private:
+ NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig;
+ TS3Settings Settings;
+ const NBackupRestoreTraits::EDataFormat DataFormat;
+ const NBackupRestoreTraits::ECompressionCodec CompressionCodec;
+ bool ProxyResolved;
+
+ TMaybe<THttpResolverConfig> HttpResolverConfig;
+ TActorId HttpProxy;
+
+ const TActorId DataShard;
+ const ui64 TxId;
+ const TMaybe<Ydb::Table::CreateTableRequest> Scheme;
+ const TString Metadata;
+
+ const ui32 Retries;
+ ui32 Attempt;
+
+ TActorId Client;
+ TDuration Delay;
+ bool SchemeUploaded;
+ bool MetadataUploaded;
+ bool MultiPart;
+ bool Last;
+
+ TActorId Scanner;
+ TString Buffer;
+
+ TMaybe<TString> UploadId;
+ TVector<TString> Parts;
+ TMaybe<TString> Error;
}; // TS3Uploader
diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make
index 6ef1f11b1f..dde0d8d105 100644
--- a/ydb/core/tx/datashard/ya.make
+++ b/ydb/core/tx/datashard/ya.make
@@ -216,6 +216,7 @@ RESOURCE(
PEERDIR(
contrib/libs/zstd
ydb/library/actors/core
+ ydb/library/actors/http
library/cpp/containers/absl_flat_hash
library/cpp/containers/stack_vector
library/cpp/digest/md5