diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-12-06 16:03:20 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-12-06 17:12:02 +0300 |
commit | 658905d984103f8f38606af7e82fce0922e10eea (patch) | |
tree | c9ce6cafef3cb6da424a2a88b2c81961c30eefa9 | |
parent | 1c561659d3a0f1fcc89a3cd615ee731f0e2bcf2c (diff) | |
download | ydb-658905d984103f8f38606af7e82fce0922e10eea.tar.gz |
S3 proxy resolver: config & impl KIKIMR-16190
-rw-r--r-- | ydb/core/base/appdata.cpp | 2 | ||||
-rw-r--r-- | ydb/core/base/appdata_fwd.h | 3 | ||||
-rw-r--r-- | ydb/core/driver_lib/run/run.cpp | 4 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 20 | ||||
-rw-r--r-- | ydb/core/protos/console_config.proto | 1 | ||||
-rw-r--r-- | ydb/core/testlib/actors/test_runtime.cpp | 1 | ||||
-rw-r--r-- | ydb/core/testlib/basics/appdata.cpp | 1 | ||||
-rw-r--r-- | ydb/core/testlib/basics/appdata.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/export_s3_base_uploader.h | 548 | ||||
-rw-r--r-- | ydb/core/tx/datashard/export_s3_uploader.cpp | 647 | ||||
-rw-r--r-- | ydb/core/tx/datashard/ya.make | 1 |
16 files changed, 683 insertions, 556 deletions
diff --git a/ydb/core/base/appdata.cpp b/ydb/core/base/appdata.cpp index 90086cd905..8fb299a65d 100644 --- a/ydb/core/base/appdata.cpp +++ b/ydb/core/base/appdata.cpp @@ -74,6 +74,7 @@ TAppData::TAppData( , DomainsConfigPtr(new NKikimrConfig::TDomainsConfig()) , BootstrapConfigPtr(new NKikimrConfig::TBootstrap()) , AwsCompatibilityConfigPtr(new NKikimrConfig::TAwsCompatibilityConfig()) + , S3ProxyResolverConfigPtr(new NKikimrConfig::TS3ProxyResolverConfig()) , StreamingConfig(*StreamingConfigPtr.get()) , PQConfig(*PQConfigPtr.get()) , PQClusterDiscoveryConfig(*PQClusterDiscoveryConfigPtr.get()) @@ -94,6 +95,7 @@ TAppData::TAppData( , DomainsConfig(*DomainsConfigPtr.get()) , BootstrapConfig(*BootstrapConfigPtr.get()) , AwsCompatibilityConfig(*AwsCompatibilityConfigPtr.get()) + , S3ProxyResolverConfig(*S3ProxyResolverConfigPtr.get()) , KikimrShouldContinue(kikimrShouldContinue) {} diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h index 2863ddd572..329183c5af 100644 --- a/ydb/core/base/appdata_fwd.h +++ b/ydb/core/base/appdata_fwd.h @@ -59,6 +59,7 @@ namespace NKikimrConfig { class TDomainsConfig; class TBootstrap; class TAwsCompatibilityConfig; + class TS3ProxyResolverConfig; } namespace NKikimrNetClassifier { @@ -201,6 +202,7 @@ struct TAppData { std::unique_ptr<NKikimrConfig::TDomainsConfig> DomainsConfigPtr; std::unique_ptr<NKikimrConfig::TBootstrap> BootstrapConfigPtr; std::unique_ptr<NKikimrConfig::TAwsCompatibilityConfig> AwsCompatibilityConfigPtr; + std::unique_ptr<NKikimrConfig::TS3ProxyResolverConfig> S3ProxyResolverConfigPtr; std::unique_ptr<NKikimrSharedCache::TSharedCacheConfig> SharedCacheConfigPtr; NKikimrStream::TStreamingConfig& StreamingConfig; @@ -223,6 +225,7 @@ struct TAppData { NKikimrConfig::TDomainsConfig& DomainsConfig; NKikimrConfig::TBootstrap& BootstrapConfig; NKikimrConfig::TAwsCompatibilityConfig& AwsCompatibilityConfig; + NKikimrConfig::TS3ProxyResolverConfig& S3ProxyResolverConfig; bool EnforceUserTokenRequirement = false; bool AllowHugeKeyValueDeletes = true; // delete when all clients limit deletes per request bool EnableKqpSpilling = false; diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 300affda40..7476c90fcc 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1093,6 +1093,10 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig) AppData->AwsCompatibilityConfig = runConfig.AppConfig.GetAwsCompatibilityConfig(); } + if (runConfig.AppConfig.HasS3ProxyResolverConfig()) { + AppData->S3ProxyResolverConfig = runConfig.AppConfig.GetS3ProxyResolverConfig(); + } + // setup resource profiles AppData->ResourceProfiles = new TResourceProfiles; if (runConfig.AppConfig.GetBootstrapConfig().ResourceProfilesSize()) diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 5dc2e109b5..8cc3ff2f04 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -752,6 +752,25 @@ message THttpProxyConfig { optional string JwtFile = 10; } +message TS3ProxyResolverConfig { + message THttpResolverConfig { + // resolve proxy host to connect to that endpoint using this url + optional string ResolveUrl = 1; + // use resolved proxy through these ports + optional uint32 HttpPort = 2; + optional uint32 HttpsPort = 3; + } + + message TEndpoint { + // S3 endpoint + optional string Endpoint = 1; + oneof Resolver { + THttpResolverConfig HttpResolver = 2; + } + } + + repeated TEndpoint Endpoints = 1; +} message TSqsConfig { optional bool EnableSqs = 5; @@ -1671,6 +1690,7 @@ message TAppConfig { optional TQueryServiceConfig QueryServiceConfig = 73; optional TConveyorConfig InsertConveyorConfig = 74; optional bool AllowEditYamlInUi = 75; + optional TS3ProxyResolverConfig S3ProxyResolverConfig = 76; repeated TNamedConfig NamedConfigs = 100; optional string ClusterYamlConfig = 101; diff --git a/ydb/core/protos/console_config.proto b/ydb/core/protos/console_config.proto index 61daa66137..4e03297c7a 100644 --- a/ydb/core/protos/console_config.proto +++ b/ydb/core/protos/console_config.proto @@ -135,6 +135,7 @@ message TConfigItem { QueryServiceConfigItem = 73; InsertConveyorConfigItem = 74; AllowEditYamlInUiItem = 75; + S3ProxyResolverConfigItem = 76; NamedConfigsItem = 100; ClusterYamlConfigItem = 101; diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp index 56407e24ef..effe935e59 100644 --- a/ydb/core/testlib/actors/test_runtime.cpp +++ b/ydb/core/testlib/actors/test_runtime.cpp @@ -150,6 +150,7 @@ namespace NActors { nodeAppData->ColumnShardConfig = app0->ColumnShardConfig; nodeAppData->MeteringConfig = app0->MeteringConfig; nodeAppData->AwsCompatibilityConfig = app0->AwsCompatibilityConfig; + nodeAppData->S3ProxyResolverConfig = app0->S3ProxyResolverConfig; nodeAppData->EnableMvccSnapshotWithLegacyDomainRoot = app0->EnableMvccSnapshotWithLegacyDomainRoot; nodeAppData->IoContextFactory = app0->IoContextFactory; if (KeyConfigGenerator) { diff --git a/ydb/core/testlib/basics/appdata.cpp b/ydb/core/testlib/basics/appdata.cpp index 1a0a0974b2..e7b5363d8d 100644 --- a/ydb/core/testlib/basics/appdata.cpp +++ b/ydb/core/testlib/basics/appdata.cpp @@ -57,6 +57,7 @@ namespace NKikimr { app->SchemeShardConfig = SchemeShardConfig; app->MeteringConfig = MeteringConfig; app->AwsCompatibilityConfig = AwsCompatibilityConfig; + app->S3ProxyResolverConfig = S3ProxyResolverConfig; app->FeatureFlags = FeatureFlags; // This is a special setting active in test runtime only diff --git a/ydb/core/testlib/basics/appdata.h b/ydb/core/testlib/basics/appdata.h index b902ce89cb..a17e2b0ac4 100644 --- a/ydb/core/testlib/basics/appdata.h +++ b/ydb/core/testlib/basics/appdata.h @@ -96,6 +96,7 @@ namespace NKikimr { NKikimrConfig::TMeteringConfig MeteringConfig; NKikimrPQ::TPQConfig PQConfig; NKikimrConfig::TAwsCompatibilityConfig AwsCompatibilityConfig; + NKikimrConfig::TS3ProxyResolverConfig S3ProxyResolverConfig; private: TAutoPtr<TMine> Mine; diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt index 0159a00ee0..ca3bfea616 100644 --- a/ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt +++ b/ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt @@ -108,6 +108,7 @@ target_link_libraries(core-tx-datashard PUBLIC library-cpp-resource contrib-libs-zstd library-actors-core + library-actors-http cpp-containers-absl_flat_hash cpp-containers-stack_vector cpp-digest-md5 @@ -376,6 +377,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC library-cpp-resource contrib-libs-zstd library-actors-core + library-actors-http cpp-containers-absl_flat_hash cpp-containers-stack_vector cpp-digest-md5 diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt index 0159a00ee0..ca3bfea616 100644 --- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt @@ -108,6 +108,7 @@ target_link_libraries(core-tx-datashard PUBLIC library-cpp-resource contrib-libs-zstd library-actors-core + library-actors-http cpp-containers-absl_flat_hash cpp-containers-stack_vector cpp-digest-md5 @@ -376,6 +377,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC library-cpp-resource contrib-libs-zstd library-actors-core + library-actors-http cpp-containers-absl_flat_hash cpp-containers-stack_vector cpp-digest-md5 diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt index 414a952f2b..83f3d65fee 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt @@ -109,6 +109,7 @@ target_link_libraries(core-tx-datashard PUBLIC library-cpp-resource contrib-libs-zstd library-actors-core + library-actors-http cpp-containers-absl_flat_hash cpp-containers-stack_vector cpp-digest-md5 @@ -378,6 +379,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC library-cpp-resource contrib-libs-zstd library-actors-core + library-actors-http cpp-containers-absl_flat_hash cpp-containers-stack_vector cpp-digest-md5 diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt index 414a952f2b..83f3d65fee 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt @@ -109,6 +109,7 @@ target_link_libraries(core-tx-datashard PUBLIC library-cpp-resource contrib-libs-zstd library-actors-core + library-actors-http cpp-containers-absl_flat_hash cpp-containers-stack_vector cpp-digest-md5 @@ -378,6 +379,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC library-cpp-resource contrib-libs-zstd library-actors-core + library-actors-http cpp-containers-absl_flat_hash cpp-containers-stack_vector cpp-digest-md5 diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt index 15228d5d60..99d4810d8a 100644 --- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt @@ -109,6 +109,7 @@ target_link_libraries(core-tx-datashard PUBLIC library-cpp-resource contrib-libs-zstd library-actors-core + library-actors-http cpp-containers-absl_flat_hash cpp-containers-stack_vector cpp-digest-md5 @@ -373,6 +374,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC library-cpp-resource contrib-libs-zstd library-actors-core + library-actors-http cpp-containers-absl_flat_hash cpp-containers-stack_vector cpp-digest-md5 diff --git a/ydb/core/tx/datashard/export_s3_base_uploader.h b/ydb/core/tx/datashard/export_s3_base_uploader.h deleted file mode 100644 index c1e7cc8b08..0000000000 --- a/ydb/core/tx/datashard/export_s3_base_uploader.h +++ /dev/null @@ -1,548 +0,0 @@ -#pragma once -#ifndef KIKIMR_DISABLE_S3_OPS - -#include "datashard.h" -#include "export_common.h" -#include "export_s3.h" -#include "extstorage_usage_config.h" - -#include <ydb/core/base/appdata.h> -#include <ydb/core/protos/flat_scheme_op.pb.h> -#include <ydb/library/services/services.pb.h> -#include <ydb/core/wrappers/s3_storage_config.h> -#include <ydb/core/wrappers/s3_wrapper.h> -#include <ydb/core/wrappers/events/common.h> -#include <ydb/library/actors/core/actor_bootstrapped.h> -#include <ydb/library/actors/core/hfunc.h> -#include <library/cpp/random_provider/random_provider.h> - -#include <util/generic/buffer.h> -#include <util/generic/maybe.h> -#include <util/generic/ptr.h> -#include <util/generic/string.h> -#include <util/string/builder.h> -#include <util/string/cast.h> - -#include <google/protobuf/text_format.h> - -namespace NKikimr { -namespace NDataShard { - -class IProxyOps { -public: - virtual ~IProxyOps() = default; - virtual bool NeedToResolveProxy() const = 0; - virtual void ResolveProxy() = 0; - -}; // IProxyOps - -template <typename TDerived> -class TS3UploaderBase: public TActorBootstrapped<TDerived> - , public IProxyOps -{ - using TEvExternalStorage = NWrappers::TEvExternalStorage; - using TEvBuffer = TEvExportScan::TEvBuffer<TBuffer>; - -protected: - void Restart() { - Y_ABORT_UNLESS(ProxyResolved); - - MultiPart = false; - Last = false; - Parts.clear(); - - if (Attempt) { - this->Send(std::exchange(Client, TActorId()), new TEvents::TEvPoisonPill()); - } - - Client = this->RegisterWithSameMailbox(NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator())); - - if (!MetadataUploaded) { - UploadMetadata(); - } else if (!SchemeUploaded) { - UploadScheme(); - } else { - this->Become(&TDerived::StateUploadData); - - if (Attempt) { - this->Send(std::exchange(Scanner, TActorId()), new TEvExportScan::TEvReset()); - } else if (Scanner) { - this->Send(Scanner, new TEvExportScan::TEvFeed()); - } - } - } - - void UploadScheme() { - Y_ABORT_UNLESS(!SchemeUploaded); - - if (!Scheme) { - return Finish(false, "Cannot infer scheme"); - } - - google::protobuf::TextFormat::PrintToString(Scheme.GetRef(), &Buffer); - - auto request = Aws::S3::Model::PutObjectRequest() - .WithKey(Settings.GetSchemeKey()) - .WithStorageClass(Settings.GetStorageClass()); - this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer))); - - this->Become(&TDerived::StateUploadScheme); - } - - void UploadMetadata() { - Y_ABORT_UNLESS(!MetadataUploaded); - - Buffer = std::move(Metadata); - - auto request = Aws::S3::Model::PutObjectRequest() - .WithKey(Settings.GetMetadataKey()) - .WithStorageClass(Settings.GetStorageClass()); - this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer))); - - this->Become(&TDerived::StateUploadMetadata); - } - - void HandleScheme(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { - const auto& result = ev->Get()->Result; - - EXPORT_LOG_D("HandleScheme TEvExternalStorage::TEvPutObjectResponse" - << ": self# " << this->SelfId() - << ", result# " << result); - - if (!CheckResult(result, TStringBuf("PutObject (scheme)"))) { - return; - } - - SchemeUploaded = true; - - if (Scanner) { - this->Send(Scanner, new TEvExportScan::TEvFeed()); - } - - this->Become(&TDerived::StateUploadData); - } - - void HandleMetadata(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { - const auto& result = ev->Get()->Result; - - EXPORT_LOG_D("HandleMetadata TEvExternalStorage::TEvPutObjectResponse" - << ": self# " << this->SelfId() - << ", result# " << result); - - if (!CheckResult(result, TStringBuf("PutObject (metadata)"))) { - return; - } - - MetadataUploaded = true; - - UploadScheme(); - } - - void Handle(TEvExportScan::TEvReady::TPtr& ev) { - EXPORT_LOG_D("Handle TEvExportScan::TEvReady" - << ": self# " << this->SelfId() - << ", sender# " << ev->Sender); - - Scanner = ev->Sender; - - if (Error) { - return PassAway(); - } - - if (ProxyResolved && SchemeUploaded && MetadataUploaded) { - this->Send(Scanner, new TEvExportScan::TEvFeed()); - } - } - - void Handle(TEvBuffer::TPtr& ev) { - EXPORT_LOG_D("Handle TEvExportScan::TEvBuffer" - << ": self# " << this->SelfId() - << ", sender# " << ev->Sender - << ", msg# " << ev->Get()->ToString()); - - if (ev->Sender != Scanner) { - EXPORT_LOG_W("Received buffer from unknown scanner" - << ": self# " << this->SelfId() - << ", sender# " << ev->Sender - << ", scanner# " << Scanner); - return; - } - - Last = ev->Get()->Last; - MultiPart = MultiPart || !Last; - ev->Get()->Buffer.AsString(Buffer); - - UploadData(); - } - - void UploadData() { - if (!MultiPart) { - auto request = Aws::S3::Model::PutObjectRequest() - .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) - .WithStorageClass(Settings.GetStorageClass()); - this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer))); - } else { - if (!UploadId) { - this->Send(DataShard, new TEvDataShard::TEvGetS3Upload(this->SelfId(), TxId)); - return; - } - - auto request = Aws::S3::Model::UploadPartRequest() - .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) - .WithUploadId(*UploadId) - .WithPartNumber(Parts.size() + 1); - this->Send(Client, new TEvExternalStorage::TEvUploadPartRequest(request, std::move(Buffer))); - } - } - - void HandleData(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { - const auto& result = ev->Get()->Result; - - EXPORT_LOG_D("HandleData TEvExternalStorage::TEvPutObjectResponse" - << ": self# " << this->SelfId() - << ", result# " << result); - - if (!CheckResult(result, TStringBuf("PutObject (data)"))) { - return; - } - - Finish(); - } - - void Handle(TEvDataShard::TEvS3Upload::TPtr& ev) { - auto& upload = ev->Get()->Upload; - - EXPORT_LOG_D("Handle TEvDataShard::TEvS3Upload" - << ": self# " << this->SelfId() - << ", upload# " << upload); - - if (!upload) { - auto request = Aws::S3::Model::CreateMultipartUploadRequest() - .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) - .WithStorageClass(Settings.GetStorageClass()); - this->Send(Client, new TEvExternalStorage::TEvCreateMultipartUploadRequest(request)); - } else { - UploadId = upload->Id; - - switch (upload->Status) { - case TS3Upload::EStatus::UploadParts: - return UploadData(); - - case TS3Upload::EStatus::Complete: { - Parts = std::move(upload->Parts); - - TVector<Aws::S3::Model::CompletedPart> parts(Reserve(Parts.size())); - for (ui32 partIndex = 0; partIndex < Parts.size(); ++partIndex) { - parts.emplace_back(Aws::S3::Model::CompletedPart() - .WithPartNumber(partIndex + 1) - .WithETag(Parts.at(partIndex))); - } - - auto request = Aws::S3::Model::CompleteMultipartUploadRequest() - .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) - .WithUploadId(*UploadId) - .WithMultipartUpload(Aws::S3::Model::CompletedMultipartUpload().WithParts(std::move(parts))); - this->Send(Client, new TEvExternalStorage::TEvCompleteMultipartUploadRequest(request)); - break; - } - - case TS3Upload::EStatus::Abort: { - Error = std::move(upload->Error); - if (!Error) { - Error = "<empty>"; - } - - auto request = Aws::S3::Model::AbortMultipartUploadRequest() - .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) - .WithUploadId(*UploadId); - this->Send(Client, new TEvExternalStorage::TEvAbortMultipartUploadRequest(request)); - break; - } - } - } - } - - void Handle(TEvExternalStorage::TEvCreateMultipartUploadResponse::TPtr& ev) { - const auto& result = ev->Get()->Result; - - EXPORT_LOG_D("Handle TEvExternalStorage::TEvCreateMultipartUploadResponse" - << ": self# " << this->SelfId() - << ", result# " << result); - - if (!CheckResult(result, TStringBuf("CreateMultipartUpload"))) { - return; - } - - this->Send(DataShard, new TEvDataShard::TEvStoreS3UploadId(this->SelfId(), TxId, result.GetResult().GetUploadId().c_str())); - } - - void Handle(TEvExternalStorage::TEvUploadPartResponse::TPtr& ev) { - const auto& result = ev->Get()->Result; - - EXPORT_LOG_D("Handle TEvExternalStorage::TEvUploadPartResponse" - << ": self# " << this->SelfId() - << ", result# " << result); - - if (!CheckResult(result, TStringBuf("UploadPart"))) { - return; - } - - Parts.push_back(result.GetResult().GetETag().c_str()); - - if (Last) { - return Finish(); - } - - this->Send(Scanner, new TEvExportScan::TEvFeed()); - } - - void Handle(TEvExternalStorage::TEvCompleteMultipartUploadResponse::TPtr& ev) { - const auto& result = ev->Get()->Result; - - EXPORT_LOG_D("Handle TEvExternalStorage::TEvCompleteMultipartUploadResponse" - << ": self# " << this->SelfId() - << ", result# " << result); - - if (result.IsSuccess()) { - return PassAway(); - } - - const auto& error = result.GetError(); - if (error.GetErrorType() == Aws::S3::S3Errors::NO_SUCH_UPLOAD) { - return PassAway(); - } - - if (CanRetry(error)) { - UploadId.Clear(); // force getting info after restart - Retry(); - } else { - Error = error.GetMessage().c_str(); - PassAway(); - } - } - - void Handle(TEvExternalStorage::TEvAbortMultipartUploadResponse::TPtr& ev) { - const auto& result = ev->Get()->Result; - - EXPORT_LOG_D("Handle TEvExternalStorage::TEvAbortMultipartUploadResponse" - << ": self# " << this->SelfId() - << ", result# " << result); - - if (result.IsSuccess()) { - return PassAway(); - } - - const auto& error = result.GetError(); - if (CanRetry(error)) { - UploadId.Clear(); // force getting info after restart - Retry(); - } else { - Y_ABORT_UNLESS(Error); - Error = TStringBuilder() << *Error << " Additionally, 'AbortMultipartUpload' has failed: " - << error.GetMessage(); - PassAway(); - } - } - - template <typename TResult> - bool CheckResult(const TResult& result, const TStringBuf marker) { - if (result.IsSuccess()) { - return true; - } - - EXPORT_LOG_E("Error at '" << marker << "'" - << ": self# " << this->SelfId() - << ", error# " << result); - RetryOrFinish(result.GetError()); - - return false; - } - - static bool ShouldRetry(const Aws::S3::S3Error& error) { - if (error.ShouldRetry()) { - return true; - } - - if ("TooManyRequests" == error.GetExceptionName()) { - return true; - } - - return false; - } - - bool CanRetry(const Aws::S3::S3Error& error) const { - return Attempt < Retries && ShouldRetry(error); - } - - void Retry() { - Delay = Min(Delay * ++Attempt, TDuration::Minutes(10)); - const TDuration random = TDuration::FromValue(TAppData::RandomProvider->GenRand64() % Delay.MicroSeconds()); - this->Schedule(Delay + random, new TEvents::TEvWakeup()); - } - - void RetryOrFinish(const Aws::S3::S3Error& error) { - if (CanRetry(error)) { - Retry(); - } else { - Finish(false, TStringBuilder() << "S3 error: " << error.GetMessage().c_str()); - } - } - - void Finish(bool success = true, const TString& error = TString()) { - EXPORT_LOG_I("Finish" - << ": self# " << this->SelfId() - << ", success# " << success - << ", error# " << error - << ", multipart# " << MultiPart - << ", uploadId# " << UploadId); - - if (!success) { - Error = error; - } - - if (!MultiPart || !UploadId) { - if (!Scanner) { - return; - } - - PassAway(); - } else { - if (success) { - this->Send(DataShard, new TEvDataShard::TEvChangeS3UploadStatus(this->SelfId(), TxId, - TS3Upload::EStatus::Complete, std::move(Parts))); - } else { - this->Send(DataShard, new TEvDataShard::TEvChangeS3UploadStatus(this->SelfId(), TxId, - TS3Upload::EStatus::Abort, *Error)); - } - } - } - - void PassAway() override { - if (Scanner) { - this->Send(Scanner, new TEvExportScan::TEvFinish(Error.Empty(), Error.GetOrElse(TString()))); - } - - this->Send(Client, new TEvents::TEvPoisonPill()); - - IActor::PassAway(); - } - -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::EXPORT_S3_UPLOADER_ACTOR; - } - - static constexpr TStringBuf LogPrefix() { - return "s3"sv; - } - - explicit TS3UploaderBase( - const TActorId& dataShard, ui64 txId, - const NKikimrSchemeOp::TBackupTask& task, - TMaybe<Ydb::Table::CreateTableRequest>&& scheme, - TString&& metadata) - : ExternalStorageConfig(new NWrappers::NExternalStorage::TS3ExternalStorageConfig(task.GetS3Settings())) - , Settings(TS3Settings::FromBackupTask(task)) - , DataFormat(NBackupRestoreTraits::EDataFormat::Csv) - , CompressionCodec(NBackupRestoreTraits::CodecFromTask(task)) - , DataShard(dataShard) - , TxId(txId) - , Scheme(std::move(scheme)) - , Metadata(std::move(metadata)) - , Retries(task.GetNumberOfRetries()) - , Attempt(0) - , Delay(TDuration::Minutes(1)) - , SchemeUploaded(task.GetShardNum() == 0 ? false : true) - , MetadataUploaded(task.GetShardNum() == 0 ? false : true) - { - } - - void Bootstrap() { - EXPORT_LOG_D("Bootstrap" - << ": self# " << this->SelfId() - << ", attempt# " << Attempt); - - ProxyResolved = !NeedToResolveProxy(); - if (!ProxyResolved) { - ResolveProxy(); - } else { - Restart(); - } - } - - STATEFN(StateBase) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvExportScan::TEvReady, Handle); - - sFunc(TEvents::TEvWakeup, Bootstrap); - sFunc(TEvents::TEvPoisonPill, PassAway); - } - } - - STATEFN(StateUploadScheme) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleScheme); - default: - return StateBase(ev); - } - } - - STATEFN(StateUploadMetadata) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleMetadata); - default: - return StateBase(ev); - } - } - - STATEFN(StateUploadData) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvBuffer, Handle); - hFunc(TEvDataShard::TEvS3Upload, Handle); - - hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleData); - hFunc(TEvExternalStorage::TEvCreateMultipartUploadResponse, Handle); - hFunc(TEvExternalStorage::TEvUploadPartResponse, Handle); - hFunc(TEvExternalStorage::TEvCompleteMultipartUploadResponse, Handle); - hFunc(TEvExternalStorage::TEvAbortMultipartUploadResponse, Handle); - default: - return StateBase(ev); - } - } - -protected: - NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig; - TS3Settings Settings; - const NBackupRestoreTraits::EDataFormat DataFormat; - const NBackupRestoreTraits::ECompressionCodec CompressionCodec; - bool ProxyResolved; - -private: - const TActorId DataShard; - const ui64 TxId; - const TMaybe<Ydb::Table::CreateTableRequest> Scheme; - const TString Metadata; - - const ui32 Retries; - ui32 Attempt; - - TActorId Client; - TDuration Delay; - bool SchemeUploaded; - bool MetadataUploaded; - bool MultiPart; - bool Last; - - TActorId Scanner; - TString Buffer; - - TMaybe<TString> UploadId; - TVector<TString> Parts; - TMaybe<TString> Error; - -}; // TS3UploaderBase - -} // NDataShard -} // NKikimr - -#endif // KIKIMR_DISABLE_S3_OPS diff --git a/ydb/core/tx/datashard/export_s3_uploader.cpp b/ydb/core/tx/datashard/export_s3_uploader.cpp index 0ff72f3f04..9419cf2c9b 100644 --- a/ydb/core/tx/datashard/export_s3_uploader.cpp +++ b/ydb/core/tx/datashard/export_s3_uploader.cpp @@ -1,24 +1,655 @@ #ifndef KIKIMR_DISABLE_S3_OPS -#include "export_s3_base_uploader.h" - #include "backup_restore_common.h" +#include "datashard.h" +#include "export_common.h" +#include "export_s3.h" +#include "extstorage_usage_config.h" + +#include <ydb/core/base/appdata.h> +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/library/services/services.pb.h> +#include <ydb/core/wrappers/s3_storage_config.h> +#include <ydb/core/wrappers/s3_wrapper.h> +#include <ydb/core/wrappers/events/common.h> +#include <ydb/library/actors/core/actor_bootstrapped.h> +#include <ydb/library/actors/core/hfunc.h> +#include <ydb/library/actors/http/http_proxy.h> +#include <library/cpp/random_provider/random_provider.h> + +#include <util/generic/buffer.h> +#include <util/generic/maybe.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/string/builder.h> +#include <util/string/cast.h> + +#include <google/protobuf/text_format.h> namespace NKikimr { namespace NDataShard { -class TS3Uploader: public TS3UploaderBase<TS3Uploader> { -protected: - bool NeedToResolveProxy() const override { +class TS3Uploader: public TActorBootstrapped<TS3Uploader> { + using TS3ExternalStorageConfig = NWrappers::NExternalStorage::TS3ExternalStorageConfig; + using THttpResolverConfig = NKikimrConfig::TS3ProxyResolverConfig::THttpResolverConfig; + using TEvExternalStorage = NWrappers::TEvExternalStorage; + using TEvBuffer = TEvExportScan::TEvBuffer<TBuffer>; + + static TMaybe<THttpResolverConfig> GetHttpResolverConfig(TStringBuf endpoint) { + for (const auto& entry : AppData()->S3ProxyResolverConfig.GetEndpoints()) { + if (entry.GetEndpoint() == endpoint && entry.HasHttpResolver()) { + return entry.GetHttpResolver(); + } + } + + return Nothing(); + } + + static TStringBuf NormalizeEndpoint(TStringBuf endpoint) { + Y_UNUSED(endpoint.SkipPrefix("http://") || endpoint.SkipPrefix("https://")); + Y_UNUSED(endpoint.ChopSuffix(":80") || endpoint.ChopSuffix(":443")); + return endpoint; + } + + static TMaybe<THttpResolverConfig> GetHttpResolverConfig(const TS3ExternalStorageConfig& settings) { + return GetHttpResolverConfig(NormalizeEndpoint(settings.GetConfig().endpointOverride)); + } + + std::shared_ptr<TS3ExternalStorageConfig> GetS3StorageConfig() const { + return std::dynamic_pointer_cast<TS3ExternalStorageConfig>(ExternalStorageConfig); + } + + TString GetResolveProxyUrl(const TS3ExternalStorageConfig& settings) const { + Y_ABORT_UNLESS(HttpResolverConfig); + + TStringBuilder url; + switch (settings.GetConfig().scheme) { + case Aws::Http::Scheme::HTTP: + url << "http://"; + break; + case Aws::Http::Scheme::HTTPS: + url << "https://"; + break; + } + + url << HttpResolverConfig->GetResolveUrl(); + return url; + } + + void ApplyProxy(TS3ExternalStorageConfig& settings, const TString& proxyHost) const { + Y_ABORT_UNLESS(HttpResolverConfig); + + settings.ConfigRef().proxyScheme = settings.GetConfig().scheme; + settings.ConfigRef().proxyHost = proxyHost; + settings.ConfigRef().proxyCaPath = settings.GetConfig().caPath; + + switch (settings.GetConfig().proxyScheme) { + case Aws::Http::Scheme::HTTP: + settings.ConfigRef().proxyPort = HttpResolverConfig->GetHttpPort(); + break; + case Aws::Http::Scheme::HTTPS: + settings.ConfigRef().proxyPort = HttpResolverConfig->GetHttpsPort(); + break; + } + } + + void ResolveProxy() { + if (!HttpProxy) { + HttpProxy = Register(NHttp::CreateHttpProxy(NMonitoring::TMetricRegistry::SharedInstance())); + } + + Send(HttpProxy, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest( + NHttp::THttpOutgoingRequest::CreateRequestGet(GetResolveProxyUrl(*GetS3StorageConfig())), + TDuration::Seconds(10) + )); + + Become(&TThis::StateResolveProxy); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr& ev) { + const auto& msg = *ev->Get(); + + EXPORT_LOG_D("Handle NHttp::TEvHttpProxy::TEvHttpIncomingResponse" + << ": self# " << SelfId() + << ", status# " << (msg.Response ? msg.Response->Status : "null") + << ", body# " << (msg.Response ? msg.Response->Body : "null")); + + if (!msg.Response || !msg.Response->Status.StartsWith("200")) { + EXPORT_LOG_E("Error at 'GetProxy'" + << ": self# " << SelfId() + << ", error# " << msg.GetError()); + return RetryOrFinish(Aws::S3::S3Error({Aws::S3::S3Errors::SERVICE_UNAVAILABLE, true})); + } + + if (msg.Response->Body.find('<') != TStringBuf::npos) { + EXPORT_LOG_E("Error at 'GetProxy'" + << ": self# " << SelfId() + << ", error# " << "invalid body" + << ", body# " << msg.Response->Body); + return RetryOrFinish(Aws::S3::S3Error({Aws::S3::S3Errors::SERVICE_UNAVAILABLE, true})); + } + + ApplyProxy(*GetS3StorageConfig(), TString(msg.Response->Body)); + ProxyResolved = true; + + const auto& cfg = GetS3StorageConfig()->GetConfig(); + EXPORT_LOG_N("Using proxy: " + << (cfg.proxyScheme == Aws::Http::Scheme::HTTPS ? "https://" : "http://") + << cfg.proxyHost << ":" << cfg.proxyPort); + + Restart(); + } + + void Restart() { + Y_ABORT_UNLESS(ProxyResolved); + + MultiPart = false; + Last = false; + Parts.clear(); + + if (Attempt) { + this->Send(std::exchange(Client, TActorId()), new TEvents::TEvPoisonPill()); + } + + Client = this->RegisterWithSameMailbox(NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator())); + + if (!MetadataUploaded) { + UploadMetadata(); + } else if (!SchemeUploaded) { + UploadScheme(); + } else { + this->Become(&TThis::StateUploadData); + + if (Attempt) { + this->Send(std::exchange(Scanner, TActorId()), new TEvExportScan::TEvReset()); + } else if (Scanner) { + this->Send(Scanner, new TEvExportScan::TEvFeed()); + } + } + } + + void UploadScheme() { + Y_ABORT_UNLESS(!SchemeUploaded); + + if (!Scheme) { + return Finish(false, "Cannot infer scheme"); + } + + google::protobuf::TextFormat::PrintToString(Scheme.GetRef(), &Buffer); + + auto request = Aws::S3::Model::PutObjectRequest() + .WithKey(Settings.GetSchemeKey()) + .WithStorageClass(Settings.GetStorageClass()); + this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer))); + + this->Become(&TThis::StateUploadScheme); + } + + void UploadMetadata() { + Y_ABORT_UNLESS(!MetadataUploaded); + + Buffer = std::move(Metadata); + + auto request = Aws::S3::Model::PutObjectRequest() + .WithKey(Settings.GetMetadataKey()) + .WithStorageClass(Settings.GetStorageClass()); + this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer))); + + this->Become(&TThis::StateUploadMetadata); + } + + void HandleScheme(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + EXPORT_LOG_D("HandleScheme TEvExternalStorage::TEvPutObjectResponse" + << ": self# " << this->SelfId() + << ", result# " << result); + + if (!CheckResult(result, TStringBuf("PutObject (scheme)"))) { + return; + } + + SchemeUploaded = true; + + if (Scanner) { + this->Send(Scanner, new TEvExportScan::TEvFeed()); + } + + this->Become(&TThis::StateUploadData); + } + + void HandleMetadata(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + EXPORT_LOG_D("HandleMetadata TEvExternalStorage::TEvPutObjectResponse" + << ": self# " << this->SelfId() + << ", result# " << result); + + if (!CheckResult(result, TStringBuf("PutObject (metadata)"))) { + return; + } + + MetadataUploaded = true; + + UploadScheme(); + } + + void Handle(TEvExportScan::TEvReady::TPtr& ev) { + EXPORT_LOG_D("Handle TEvExportScan::TEvReady" + << ": self# " << this->SelfId() + << ", sender# " << ev->Sender); + + Scanner = ev->Sender; + + if (Error) { + return PassAway(); + } + + if (ProxyResolved && SchemeUploaded && MetadataUploaded) { + this->Send(Scanner, new TEvExportScan::TEvFeed()); + } + } + + void Handle(TEvBuffer::TPtr& ev) { + EXPORT_LOG_D("Handle TEvExportScan::TEvBuffer" + << ": self# " << this->SelfId() + << ", sender# " << ev->Sender + << ", msg# " << ev->Get()->ToString()); + + if (ev->Sender != Scanner) { + EXPORT_LOG_W("Received buffer from unknown scanner" + << ": self# " << this->SelfId() + << ", sender# " << ev->Sender + << ", scanner# " << Scanner); + return; + } + + Last = ev->Get()->Last; + MultiPart = MultiPart || !Last; + ev->Get()->Buffer.AsString(Buffer); + + UploadData(); + } + + void UploadData() { + if (!MultiPart) { + auto request = Aws::S3::Model::PutObjectRequest() + .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) + .WithStorageClass(Settings.GetStorageClass()); + this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer))); + } else { + if (!UploadId) { + this->Send(DataShard, new TEvDataShard::TEvGetS3Upload(this->SelfId(), TxId)); + return; + } + + auto request = Aws::S3::Model::UploadPartRequest() + .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) + .WithUploadId(*UploadId) + .WithPartNumber(Parts.size() + 1); + this->Send(Client, new TEvExternalStorage::TEvUploadPartRequest(request, std::move(Buffer))); + } + } + + void HandleData(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + EXPORT_LOG_D("HandleData TEvExternalStorage::TEvPutObjectResponse" + << ": self# " << this->SelfId() + << ", result# " << result); + + if (!CheckResult(result, TStringBuf("PutObject (data)"))) { + return; + } + + Finish(); + } + + void Handle(TEvDataShard::TEvS3Upload::TPtr& ev) { + auto& upload = ev->Get()->Upload; + + EXPORT_LOG_D("Handle TEvDataShard::TEvS3Upload" + << ": self# " << this->SelfId() + << ", upload# " << upload); + + if (!upload) { + auto request = Aws::S3::Model::CreateMultipartUploadRequest() + .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) + .WithStorageClass(Settings.GetStorageClass()); + this->Send(Client, new TEvExternalStorage::TEvCreateMultipartUploadRequest(request)); + } else { + UploadId = upload->Id; + + switch (upload->Status) { + case TS3Upload::EStatus::UploadParts: + return UploadData(); + + case TS3Upload::EStatus::Complete: { + Parts = std::move(upload->Parts); + + TVector<Aws::S3::Model::CompletedPart> parts(Reserve(Parts.size())); + for (ui32 partIndex = 0; partIndex < Parts.size(); ++partIndex) { + parts.emplace_back(Aws::S3::Model::CompletedPart() + .WithPartNumber(partIndex + 1) + .WithETag(Parts.at(partIndex))); + } + + auto request = Aws::S3::Model::CompleteMultipartUploadRequest() + .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) + .WithUploadId(*UploadId) + .WithMultipartUpload(Aws::S3::Model::CompletedMultipartUpload().WithParts(std::move(parts))); + this->Send(Client, new TEvExternalStorage::TEvCompleteMultipartUploadRequest(request)); + break; + } + + case TS3Upload::EStatus::Abort: { + Error = std::move(upload->Error); + if (!Error) { + Error = "<empty>"; + } + + auto request = Aws::S3::Model::AbortMultipartUploadRequest() + .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) + .WithUploadId(*UploadId); + this->Send(Client, new TEvExternalStorage::TEvAbortMultipartUploadRequest(request)); + break; + } + } + } + } + + void Handle(TEvExternalStorage::TEvCreateMultipartUploadResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + EXPORT_LOG_D("Handle TEvExternalStorage::TEvCreateMultipartUploadResponse" + << ": self# " << this->SelfId() + << ", result# " << result); + + if (!CheckResult(result, TStringBuf("CreateMultipartUpload"))) { + return; + } + + this->Send(DataShard, new TEvDataShard::TEvStoreS3UploadId(this->SelfId(), TxId, result.GetResult().GetUploadId().c_str())); + } + + void Handle(TEvExternalStorage::TEvUploadPartResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + EXPORT_LOG_D("Handle TEvExternalStorage::TEvUploadPartResponse" + << ": self# " << this->SelfId() + << ", result# " << result); + + if (!CheckResult(result, TStringBuf("UploadPart"))) { + return; + } + + Parts.push_back(result.GetResult().GetETag().c_str()); + + if (Last) { + return Finish(); + } + + this->Send(Scanner, new TEvExportScan::TEvFeed()); + } + + void Handle(TEvExternalStorage::TEvCompleteMultipartUploadResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + EXPORT_LOG_D("Handle TEvExternalStorage::TEvCompleteMultipartUploadResponse" + << ": self# " << this->SelfId() + << ", result# " << result); + + if (result.IsSuccess()) { + return PassAway(); + } + + const auto& error = result.GetError(); + if (error.GetErrorType() == Aws::S3::S3Errors::NO_SUCH_UPLOAD) { + return PassAway(); + } + + if (CanRetry(error)) { + UploadId.Clear(); // force getting info after restart + Retry(); + } else { + Error = error.GetMessage().c_str(); + PassAway(); + } + } + + void Handle(TEvExternalStorage::TEvAbortMultipartUploadResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + EXPORT_LOG_D("Handle TEvExternalStorage::TEvAbortMultipartUploadResponse" + << ": self# " << this->SelfId() + << ", result# " << result); + + if (result.IsSuccess()) { + return PassAway(); + } + + const auto& error = result.GetError(); + if (CanRetry(error)) { + UploadId.Clear(); // force getting info after restart + Retry(); + } else { + Y_ABORT_UNLESS(Error); + Error = TStringBuilder() << *Error << " Additionally, 'AbortMultipartUpload' has failed: " + << error.GetMessage(); + PassAway(); + } + } + + template <typename TResult> + bool CheckResult(const TResult& result, const TStringBuf marker) { + if (result.IsSuccess()) { + return true; + } + + EXPORT_LOG_E("Error at '" << marker << "'" + << ": self# " << this->SelfId() + << ", error# " << result); + RetryOrFinish(result.GetError()); + + return false; + } + + static bool ShouldRetry(const Aws::S3::S3Error& error) { + if (error.ShouldRetry()) { + return true; + } + + if ("TooManyRequests" == error.GetExceptionName()) { + return true; + } + return false; } - void ResolveProxy() override { - Y_ABORT("unreachable"); + bool CanRetry(const Aws::S3::S3Error& error) const { + return Attempt < Retries && ShouldRetry(error); + } + + void Retry() { + Delay = Min(Delay * ++Attempt, TDuration::Minutes(10)); + const TDuration random = TDuration::FromValue(TAppData::RandomProvider->GenRand64() % Delay.MicroSeconds()); + this->Schedule(Delay + random, new TEvents::TEvWakeup()); + } + + void RetryOrFinish(const Aws::S3::S3Error& error) { + if (CanRetry(error)) { + Retry(); + } else { + Finish(false, TStringBuilder() << "S3 error: " << error.GetMessage().c_str()); + } + } + + void Finish(bool success = true, const TString& error = TString()) { + EXPORT_LOG_I("Finish" + << ": self# " << this->SelfId() + << ", success# " << success + << ", error# " << error + << ", multipart# " << MultiPart + << ", uploadId# " << UploadId); + + if (!success) { + Error = error; + } + + if (!MultiPart || !UploadId) { + if (!Scanner) { + return; + } + + PassAway(); + } else { + if (success) { + this->Send(DataShard, new TEvDataShard::TEvChangeS3UploadStatus(this->SelfId(), TxId, + TS3Upload::EStatus::Complete, std::move(Parts))); + } else { + this->Send(DataShard, new TEvDataShard::TEvChangeS3UploadStatus(this->SelfId(), TxId, + TS3Upload::EStatus::Abort, *Error)); + } + } + } + + void PassAway() override { + if (HttpProxy) { + Send(HttpProxy, new TEvents::TEvPoisonPill()); + } + + if (Scanner) { + this->Send(Scanner, new TEvExportScan::TEvFinish(Error.Empty(), Error.GetOrElse(TString()))); + } + + this->Send(Client, new TEvents::TEvPoisonPill()); + + IActor::PassAway(); } public: - using TS3UploaderBase::TS3UploaderBase; + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::EXPORT_S3_UPLOADER_ACTOR; + } + + static constexpr TStringBuf LogPrefix() { + return "s3"sv; + } + + explicit TS3Uploader( + const TActorId& dataShard, ui64 txId, + const NKikimrSchemeOp::TBackupTask& task, + TMaybe<Ydb::Table::CreateTableRequest>&& scheme, + TString&& metadata) + : ExternalStorageConfig(new TS3ExternalStorageConfig(task.GetS3Settings())) + , Settings(TS3Settings::FromBackupTask(task)) + , DataFormat(NBackupRestoreTraits::EDataFormat::Csv) + , CompressionCodec(NBackupRestoreTraits::CodecFromTask(task)) + , HttpResolverConfig(GetHttpResolverConfig(*GetS3StorageConfig())) + , DataShard(dataShard) + , TxId(txId) + , Scheme(std::move(scheme)) + , Metadata(std::move(metadata)) + , Retries(task.GetNumberOfRetries()) + , Attempt(0) + , Delay(TDuration::Minutes(1)) + , SchemeUploaded(task.GetShardNum() == 0 ? false : true) + , MetadataUploaded(task.GetShardNum() == 0 ? false : true) + { + } + + void Bootstrap() { + EXPORT_LOG_D("Bootstrap" + << ": self# " << this->SelfId() + << ", attempt# " << Attempt); + + ProxyResolved = !HttpResolverConfig.Defined(); + if (!ProxyResolved) { + ResolveProxy(); + } else { + Restart(); + } + } + + STATEFN(StateBase) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExportScan::TEvReady, Handle); + + sFunc(TEvents::TEvWakeup, Bootstrap); + sFunc(TEvents::TEvPoisonPill, PassAway); + } + } + + STATEFN(StateResolveProxy) { + switch (ev->GetTypeRewrite()) { + hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle); + default: + return StateBase(ev); + } + } + + STATEFN(StateUploadScheme) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleScheme); + default: + return StateBase(ev); + } + } + + STATEFN(StateUploadMetadata) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleMetadata); + default: + return StateBase(ev); + } + } + + STATEFN(StateUploadData) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvBuffer, Handle); + hFunc(TEvDataShard::TEvS3Upload, Handle); + + hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleData); + hFunc(TEvExternalStorage::TEvCreateMultipartUploadResponse, Handle); + hFunc(TEvExternalStorage::TEvUploadPartResponse, Handle); + hFunc(TEvExternalStorage::TEvCompleteMultipartUploadResponse, Handle); + hFunc(TEvExternalStorage::TEvAbortMultipartUploadResponse, Handle); + default: + return StateBase(ev); + } + } + +private: + NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig; + TS3Settings Settings; + const NBackupRestoreTraits::EDataFormat DataFormat; + const NBackupRestoreTraits::ECompressionCodec CompressionCodec; + bool ProxyResolved; + + TMaybe<THttpResolverConfig> HttpResolverConfig; + TActorId HttpProxy; + + const TActorId DataShard; + const ui64 TxId; + const TMaybe<Ydb::Table::CreateTableRequest> Scheme; + const TString Metadata; + + const ui32 Retries; + ui32 Attempt; + + TActorId Client; + TDuration Delay; + bool SchemeUploaded; + bool MetadataUploaded; + bool MultiPart; + bool Last; + + TActorId Scanner; + TString Buffer; + + TMaybe<TString> UploadId; + TVector<TString> Parts; + TMaybe<TString> Error; }; // TS3Uploader diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index 6ef1f11b1f..dde0d8d105 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -216,6 +216,7 @@ RESOURCE( PEERDIR( contrib/libs/zstd ydb/library/actors/core + ydb/library/actors/http library/cpp/containers/absl_flat_hash library/cpp/containers/stack_vector library/cpp/digest/md5 |