diff options
author | Innokentii Mokin <innokentii@ydb.tech> | 2024-03-28 18:49:20 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-28 18:49:20 +0300 |
commit | 8114eed7e30d7d91067524d384ae2e4d7b20b334 (patch) | |
tree | 9fed7ee0202f6b6848dd366ab0c2ffcbe4406eb2 | |
parent | 206e2d4b74e23bb2276a6362691bb3683c2054f8 (diff) | |
download | ydb-8114eed7e30d7d91067524d384ae2e4d7b20b334.tar.gz |
[CB] Add simple s3_writer implementation (#3152)
-rw-r--r-- | ydb/core/tx/replication/service/s3_writer.cpp | 253 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/s3_writer.h | 12 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/s3_writer_ut.cpp | 93 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/ut_s3_writer/ya.make | 21 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/worker.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/ya.make | 3 | ||||
-rw-r--r-- | ydb/library/services/services.proto | 2 |
7 files changed, 386 insertions, 0 deletions
diff --git a/ydb/core/tx/replication/service/s3_writer.cpp b/ydb/core/tx/replication/service/s3_writer.cpp new file mode 100644 index 00000000000..a519ceda989 --- /dev/null +++ b/ydb/core/tx/replication/service/s3_writer.cpp @@ -0,0 +1,253 @@ +#include "json_change_record.h" +#include "logging.h" +#include "s3_writer.h" +#include "worker.h" + +#include <ydb/core/base/appdata.h> +#include <ydb/core/wrappers/s3_storage_config.h> +#include <ydb/core/wrappers/s3_wrapper.h> +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/core/hfunc.h> +#include <ydb/library/services/services.pb.h> + +#include <library/cpp/json/json_writer.h> + +#include <util/generic/maybe.h> +#include <util/string/builder.h> + +#define CB_LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::CONTINUOUS_BACKUP, GetLogPrefix() << stream) +#define CB_LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::CONTINUOUS_BACKUP, GetLogPrefix() << stream) +#define CB_LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::CONTINUOUS_BACKUP, GetLogPrefix() << stream) +#define CB_LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::CONTINUOUS_BACKUP, GetLogPrefix() << stream) +#define CB_LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::CONTINUOUS_BACKUP, GetLogPrefix() << stream) +#define CB_LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::CONTINUOUS_BACKUP, GetLogPrefix() << stream) + +namespace { + +TString GetPartKey(ui64 firstOffset, const TString& writerName) { + return Sprintf("part.%ld.%s.jsonl", firstOffset, writerName.c_str()); +} + +TString GetIdentityKey(const TString& writerName) { + return Sprintf("writer.%s.json", writerName.c_str()); +} + +} // anonymous namespace + +namespace NKikimr::NReplication::NService { + +using TS3ExternalStorageConfig = NWrappers::NExternalStorage::TS3ExternalStorageConfig; +using TEvExternalStorage = NWrappers::TEvExternalStorage; + +struct TS3Request { + TS3Request(Aws::S3::Model::PutObjectRequest&& request, TString&& buffer) + : Request(std::move(request)) + , Buffer(std::move(buffer)) + {} + + Aws::S3::Model::PutObjectRequest Request; + TString Buffer; +}; + +// TODO try to batch +// TODO add external way to configure retries +// TODO add sensors +class TS3Writer + : public TActor<TS3Writer> +{ + TStringBuf GetLogPrefix() const { + if (!LogPrefix) { + LogPrefix = TStringBuilder() + << "[S3Writer]" + << TableName + << SelfId() << " "; + } + + return LogPrefix.GetRef(); + } + + template <typename TResult> + bool CheckResult(const TResult& result, const TStringBuf marker) { + if (result.IsSuccess()) { + return true; + } + + CB_LOG_E("Error at '" << marker << "'" + << ", error# " << result); + RetryOrLeave(result.GetError()); + + return false; + } + + static bool ShouldRetry(const Aws::S3::S3Error& error) { + if (error.ShouldRetry()) { + return true; + } + + if ("TooManyRequests" == error.GetExceptionName()) { + return true; + } + + return false; + } + + bool CanRetry(const Aws::S3::S3Error& error) const { + return Attempt < Retries && ShouldRetry(error); + } + + void Retry() { + Delay = Min(Delay * ++Attempt, MaxDelay); + const TDuration random = TDuration::FromValue(TAppData::RandomProvider->GenRand64() % Delay.MicroSeconds()); + this->Schedule(Delay + random, new TEvents::TEvWakeup()); + } + + void RetryOrLeave(const Aws::S3::S3Error& error) { + if (CanRetry(error)) { + Retry(); + } else { + Leave(TStringBuilder() << "S3 error: " << error.GetMessage().c_str()); + } + } + + void Leave(const TString& error) { + CB_LOG_I("Leave" + << ": error# " << error); + + // TODO support different error kinds + Send(Worker, new TEvWorker::TEvGone(TEvWorker::TEvGone::S3_ERROR)); + + PassAway(); + } + + void SendS3Request() { + Y_VERIFY(RequestInFlight); + Send(S3Client, new TEvExternalStorage::TEvPutObjectRequest(RequestInFlight->Request, TString(RequestInFlight->Buffer))); + } + + void Handle(TEvWorker::TEvHandshake::TPtr& ev) { + Worker = ev->Sender; + CB_LOG_D("Handshake" + << ": worker# " << Worker); + + S3Client = RegisterWithSameMailbox(NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator())); + + WriteIdentity(); + } + + void WriteIdentity() { + const TString key = GetIdentityKey(WriterName); + + auto request = Aws::S3::Model::PutObjectRequest() + .WithKey(key); + + auto identity = NJson::TJsonMap{ + {"finished", Finished}, + {"table_name", TableName}, + {"writer_name", WriterName}, + }; + + TString buffer = NJson::WriteJson(identity, false); + + RequestInFlight = std::make_unique<TS3Request>(std::move(request), std::move(buffer)); + SendS3Request(); + } + + void PassAway() override { + Send(S3Client, new TEvents::TEvPoison()); + TActor::PassAway(); + } + + void Handle(TEvWorker::TEvData::TPtr& ev) { + CB_LOG_D("Handle " << ev->Get()->ToString()); + + if (!ev->Get()->Records) { + Finished = true; + WriteIdentity(); + return; + } + + const TString key = GetPartKey(ev->Get()->Records[0].Offset, WriterName); + + auto request = Aws::S3::Model::PutObjectRequest() + .WithKey(key); + + TStringBuilder buffer; + + for (auto& rec : ev->Get()->Records) { + buffer << rec.Data << '\n'; + } + + RequestInFlight = std::make_unique<TS3Request>(std::move(request), std::move(buffer)); + SendS3Request(); + } + + void Handle(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + CB_LOG_D("Handle " << ev->Get()->ToString()); + + if (!CheckResult(result, TStringBuf("PutObject"))) { + return; + } else { + RequestInFlight = nullptr; + } + + if (!IdentityWritten) { + IdentityWritten = true; + Send(Worker, new TEvWorker::TEvHandshake()); + } else if (!Finished) { + Send(Worker, new TEvWorker::TEvPoll()); + } else { + Send(Worker, new TEvWorker::TEvGone(TEvWorker::TEvGone::DONE)); + } + } + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::REPLICATION_S3_WRITER; + } + + explicit TS3Writer( + NWrappers::IExternalStorageConfig::TPtr&& s3Settings, + const TString& tableName, + const TString& writerName) + : TActor(&TThis::StateWork) + , ExternalStorageConfig(std::move(s3Settings)) + , TableName(tableName) + , WriterName(writerName) + {} + + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvWorker::TEvHandshake, Handle); + hFunc(TEvWorker::TEvData, Handle); + hFunc(TEvExternalStorage::TEvPutObjectResponse, Handle); + sFunc(TEvents::TEvWakeup, SendS3Request); + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig; + mutable TMaybe<TString> LogPrefix; + const TString TableName; + const TString WriterName; + TActorId Worker; + TActorId S3Client; + bool IdentityWritten = false; + bool Finished = false; + + std::unique_ptr<TS3Request> RequestInFlight = nullptr; + + const ui32 Retries = 3; + ui32 Attempt = 0; + + TDuration Delay = TDuration::Minutes(1); + static constexpr TDuration MaxDelay = TDuration::Minutes(10); +}; // TS3Writer + +IActor* CreateS3Writer(NWrappers::IExternalStorageConfig::TPtr&& s3Settings, const TString& tableName, const TString& writerName) { + return new TS3Writer(std::move(s3Settings), tableName, writerName); +} + +} diff --git a/ydb/core/tx/replication/service/s3_writer.h b/ydb/core/tx/replication/service/s3_writer.h new file mode 100644 index 00000000000..8e5ee121935 --- /dev/null +++ b/ydb/core/tx/replication/service/s3_writer.h @@ -0,0 +1,12 @@ +#pragma once + +#include <ydb/core/base/defs.h> +#include <ydb/core/wrappers/s3_storage_config.h> + +#include <util/generic/string.h> + +namespace NKikimr::NReplication::NService { + +IActor* CreateS3Writer(NWrappers::IExternalStorageConfig::TPtr&& s3Settings, const TString& tableName, const TString& writerName); + +} diff --git a/ydb/core/tx/replication/service/s3_writer_ut.cpp b/ydb/core/tx/replication/service/s3_writer_ut.cpp new file mode 100644 index 00000000000..bb1a075b06e --- /dev/null +++ b/ydb/core/tx/replication/service/s3_writer_ut.cpp @@ -0,0 +1,93 @@ +#include "s3_writer.h" +#include "worker.h" + +#include <ydb/core/tx/replication/ut_helpers/test_env.h> +#include <ydb/core/tx/replication/ut_helpers/test_table.h> +#include <ydb/core/wrappers/ut_helpers/s3_mock.h> +#include <ydb/core/wrappers/s3_wrapper.h> +#include <ydb/core/wrappers/s3_storage_config.h> + +#include <library/cpp/string_utils/base64/base64.h> +#include <library/cpp/testing/unittest/registar.h> + +#include <util/string/printf.h> + +namespace NKikimr::NReplication::NService { + +Y_UNIT_TEST_SUITE(S3Writer) { + using namespace NTestHelpers; + + Y_UNIT_TEST(WriteTableS3) { + using namespace NWrappers::NTestHelpers; + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + TString settings = Sprintf(R"( + endpoint: "localhost:%d" + scheme: HTTP + bucket: "TEST" + items { + source_path: "/MyRoot/Table" + destination_prefix: "" + } + )", port); + Ydb::Export::ExportToS3Settings request; + UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(settings, &request)); + + auto config = std::make_shared<NWrappers::NExternalStorage::TS3ExternalStorageConfig>(request); + + TEnv env; + env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG); + + env.CreateTable("/Root", *MakeTableDescription(TTestTableDescription{ + .Name = "Table", + .KeyColumns = {"key"}, + .Columns = { + {.Name = "key", .Type = "Uint32"}, + {.Name = "value", .Type = "Utf8"}, + }, + })); + + TString writerUuid = "AtufpxzetsqaVnEuozdXpD"; // basically base58-encoded uuid4 + + auto writer = env.GetRuntime().Register(CreateS3Writer(config, "/MyRoot/Table", writerUuid)); + env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake()); + + UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().at("/TEST/writer.AtufpxzetsqaVnEuozdXpD.json"), + R"({"finished":false,"table_name":"/MyRoot/Table","writer_name":"AtufpxzetsqaVnEuozdXpD"})"); + + using TRecord = TEvWorker::TEvData::TRecord; + env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData({ + TRecord(1, R"({"key":[1], "update":{"value":"10"}})"), + TRecord(2, R"({"key":[2], "update":{"value":"20"}})"), + TRecord(3, R"({"key":[3], "update":{"value":"30"}})"), + })); + + UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().at("/TEST/writer.AtufpxzetsqaVnEuozdXpD.json"), + R"({"finished":false,"table_name":"/MyRoot/Table","writer_name":"AtufpxzetsqaVnEuozdXpD"})"); + UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().at("/TEST/part.1.AtufpxzetsqaVnEuozdXpD.jsonl"), + R"({"key":[1], "update":{"value":"10"}})" "\n" + R"({"key":[2], "update":{"value":"20"}})" "\n" + R"({"key":[3], "update":{"value":"30"}})" "\n"); + + auto res = env.Send<TEvWorker::TEvGone>(writer, new TEvWorker::TEvData({})); + + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, TEvWorker::TEvGone::DONE); + UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().at("/TEST/writer.AtufpxzetsqaVnEuozdXpD.json"), + R"({"finished":true,"table_name":"/MyRoot/Table","writer_name":"AtufpxzetsqaVnEuozdXpD"})"); + UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().at("/TEST/part.1.AtufpxzetsqaVnEuozdXpD.jsonl"), + R"({"key":[1], "update":{"value":"10"}})" "\n" + R"({"key":[2], "update":{"value":"20"}})" "\n" + R"({"key":[3], "update":{"value":"30"}})" "\n"); + } + + // TODO test all retry behavior +} + +} diff --git a/ydb/core/tx/replication/service/ut_s3_writer/ya.make b/ydb/core/tx/replication/service/ut_s3_writer/ya.make new file mode 100644 index 00000000000..b15903f35c2 --- /dev/null +++ b/ydb/core/tx/replication/service/ut_s3_writer/ya.make @@ -0,0 +1,21 @@ +UNITTEST_FOR(ydb/core/tx/replication/service) + +FORK_SUBTESTS() + +SIZE(MEDIUM) + +TIMEOUT(600) + +PEERDIR( + ydb/core/tx/replication/ut_helpers + library/cpp/string_utils/base64 + library/cpp/testing/unittest +) + +SRCS( + s3_writer_ut.cpp +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h index 523214190ac..f89f2571729 100644 --- a/ydb/core/tx/replication/service/worker.h +++ b/ydb/core/tx/replication/service/worker.h @@ -45,6 +45,8 @@ struct TEvWorker { struct TEvGone: public TEventLocal<TEvGone, EvGone> { enum EStatus { + DONE, + S3_ERROR, SCHEME_ERROR, UNAVAILABLE, }; diff --git a/ydb/core/tx/replication/service/ya.make b/ydb/core/tx/replication/service/ya.make index cae33d0cdbb..dda4ba4a11f 100644 --- a/ydb/core/tx/replication/service/ya.make +++ b/ydb/core/tx/replication/service/ya.make @@ -12,6 +12,7 @@ PEERDIR( ydb/core/tx/replication/ydb_proxy ydb/library/actors/core ydb/library/services + ydb/core/wrappers library/cpp/json ) @@ -19,6 +20,7 @@ SRCS( json_change_record.cpp service.cpp table_writer.cpp + s3_writer.cpp topic_reader.cpp worker.cpp ) @@ -30,6 +32,7 @@ YQL_LAST_ABI_VERSION() END() RECURSE_FOR_TESTS( + ut_s3_writer ut_table_writer ut_topic_reader ut_worker diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 14d26f434c7..aeccf0da381 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -288,6 +288,7 @@ enum EServiceKikimr { DATASHARD_RESTORE = 801; IMPORT = 802; S3_WRAPPER = 803; + CONTINUOUS_BACKUP = 804; // System views SYSTEM_VIEWS = 900; @@ -1025,5 +1026,6 @@ message TActivity { PERSQUEUE_ACCOUNT_WRITE_QUOTER = 631; PERSQUEUE_WRITE_QUOTER = 632; REPLICATION_CONTROLLER_TABLE_WORKER_REGISTAR = 633; + REPLICATION_S3_WRITER = 634; }; }; |