aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorInnokentii Mokin <innokentii@ydb.tech>2024-03-28 18:49:20 +0300
committerGitHub <noreply@github.com>2024-03-28 18:49:20 +0300
commit8114eed7e30d7d91067524d384ae2e4d7b20b334 (patch)
tree9fed7ee0202f6b6848dd366ab0c2ffcbe4406eb2
parent206e2d4b74e23bb2276a6362691bb3683c2054f8 (diff)
downloadydb-8114eed7e30d7d91067524d384ae2e4d7b20b334.tar.gz
[CB] Add simple s3_writer implementation (#3152)
-rw-r--r--ydb/core/tx/replication/service/s3_writer.cpp253
-rw-r--r--ydb/core/tx/replication/service/s3_writer.h12
-rw-r--r--ydb/core/tx/replication/service/s3_writer_ut.cpp93
-rw-r--r--ydb/core/tx/replication/service/ut_s3_writer/ya.make21
-rw-r--r--ydb/core/tx/replication/service/worker.h2
-rw-r--r--ydb/core/tx/replication/service/ya.make3
-rw-r--r--ydb/library/services/services.proto2
7 files changed, 386 insertions, 0 deletions
diff --git a/ydb/core/tx/replication/service/s3_writer.cpp b/ydb/core/tx/replication/service/s3_writer.cpp
new file mode 100644
index 00000000000..a519ceda989
--- /dev/null
+++ b/ydb/core/tx/replication/service/s3_writer.cpp
@@ -0,0 +1,253 @@
+#include "json_change_record.h"
+#include "logging.h"
+#include "s3_writer.h"
+#include "worker.h"
+
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/wrappers/s3_storage_config.h>
+#include <ydb/core/wrappers/s3_wrapper.h>
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/hfunc.h>
+#include <ydb/library/services/services.pb.h>
+
+#include <library/cpp/json/json_writer.h>
+
+#include <util/generic/maybe.h>
+#include <util/string/builder.h>
+
+#define CB_LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::CONTINUOUS_BACKUP, GetLogPrefix() << stream)
+#define CB_LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::CONTINUOUS_BACKUP, GetLogPrefix() << stream)
+#define CB_LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::CONTINUOUS_BACKUP, GetLogPrefix() << stream)
+#define CB_LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::CONTINUOUS_BACKUP, GetLogPrefix() << stream)
+#define CB_LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::CONTINUOUS_BACKUP, GetLogPrefix() << stream)
+#define CB_LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::CONTINUOUS_BACKUP, GetLogPrefix() << stream)
+
+namespace {
+
+TString GetPartKey(ui64 firstOffset, const TString& writerName) {
+ return Sprintf("part.%ld.%s.jsonl", firstOffset, writerName.c_str());
+}
+
+TString GetIdentityKey(const TString& writerName) {
+ return Sprintf("writer.%s.json", writerName.c_str());
+}
+
+} // anonymous namespace
+
+namespace NKikimr::NReplication::NService {
+
+using TS3ExternalStorageConfig = NWrappers::NExternalStorage::TS3ExternalStorageConfig;
+using TEvExternalStorage = NWrappers::TEvExternalStorage;
+
+struct TS3Request {
+ TS3Request(Aws::S3::Model::PutObjectRequest&& request, TString&& buffer)
+ : Request(std::move(request))
+ , Buffer(std::move(buffer))
+ {}
+
+ Aws::S3::Model::PutObjectRequest Request;
+ TString Buffer;
+};
+
+// TODO try to batch
+// TODO add external way to configure retries
+// TODO add sensors
+class TS3Writer
+ : public TActor<TS3Writer>
+{
+ TStringBuf GetLogPrefix() const {
+ if (!LogPrefix) {
+ LogPrefix = TStringBuilder()
+ << "[S3Writer]"
+ << TableName
+ << SelfId() << " ";
+ }
+
+ return LogPrefix.GetRef();
+ }
+
+ template <typename TResult>
+ bool CheckResult(const TResult& result, const TStringBuf marker) {
+ if (result.IsSuccess()) {
+ return true;
+ }
+
+ CB_LOG_E("Error at '" << marker << "'"
+ << ", error# " << result);
+ RetryOrLeave(result.GetError());
+
+ return false;
+ }
+
+ static bool ShouldRetry(const Aws::S3::S3Error& error) {
+ if (error.ShouldRetry()) {
+ return true;
+ }
+
+ if ("TooManyRequests" == error.GetExceptionName()) {
+ return true;
+ }
+
+ return false;
+ }
+
+ bool CanRetry(const Aws::S3::S3Error& error) const {
+ return Attempt < Retries && ShouldRetry(error);
+ }
+
+ void Retry() {
+ Delay = Min(Delay * ++Attempt, MaxDelay);
+ const TDuration random = TDuration::FromValue(TAppData::RandomProvider->GenRand64() % Delay.MicroSeconds());
+ this->Schedule(Delay + random, new TEvents::TEvWakeup());
+ }
+
+ void RetryOrLeave(const Aws::S3::S3Error& error) {
+ if (CanRetry(error)) {
+ Retry();
+ } else {
+ Leave(TStringBuilder() << "S3 error: " << error.GetMessage().c_str());
+ }
+ }
+
+ void Leave(const TString& error) {
+ CB_LOG_I("Leave"
+ << ": error# " << error);
+
+ // TODO support different error kinds
+ Send(Worker, new TEvWorker::TEvGone(TEvWorker::TEvGone::S3_ERROR));
+
+ PassAway();
+ }
+
+ void SendS3Request() {
+ Y_VERIFY(RequestInFlight);
+ Send(S3Client, new TEvExternalStorage::TEvPutObjectRequest(RequestInFlight->Request, TString(RequestInFlight->Buffer)));
+ }
+
+ void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
+ Worker = ev->Sender;
+ CB_LOG_D("Handshake"
+ << ": worker# " << Worker);
+
+ S3Client = RegisterWithSameMailbox(NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
+
+ WriteIdentity();
+ }
+
+ void WriteIdentity() {
+ const TString key = GetIdentityKey(WriterName);
+
+ auto request = Aws::S3::Model::PutObjectRequest()
+ .WithKey(key);
+
+ auto identity = NJson::TJsonMap{
+ {"finished", Finished},
+ {"table_name", TableName},
+ {"writer_name", WriterName},
+ };
+
+ TString buffer = NJson::WriteJson(identity, false);
+
+ RequestInFlight = std::make_unique<TS3Request>(std::move(request), std::move(buffer));
+ SendS3Request();
+ }
+
+ void PassAway() override {
+ Send(S3Client, new TEvents::TEvPoison());
+ TActor::PassAway();
+ }
+
+ void Handle(TEvWorker::TEvData::TPtr& ev) {
+ CB_LOG_D("Handle " << ev->Get()->ToString());
+
+ if (!ev->Get()->Records) {
+ Finished = true;
+ WriteIdentity();
+ return;
+ }
+
+ const TString key = GetPartKey(ev->Get()->Records[0].Offset, WriterName);
+
+ auto request = Aws::S3::Model::PutObjectRequest()
+ .WithKey(key);
+
+ TStringBuilder buffer;
+
+ for (auto& rec : ev->Get()->Records) {
+ buffer << rec.Data << '\n';
+ }
+
+ RequestInFlight = std::make_unique<TS3Request>(std::move(request), std::move(buffer));
+ SendS3Request();
+ }
+
+ void Handle(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
+ const auto& result = ev->Get()->Result;
+
+ CB_LOG_D("Handle " << ev->Get()->ToString());
+
+ if (!CheckResult(result, TStringBuf("PutObject"))) {
+ return;
+ } else {
+ RequestInFlight = nullptr;
+ }
+
+ if (!IdentityWritten) {
+ IdentityWritten = true;
+ Send(Worker, new TEvWorker::TEvHandshake());
+ } else if (!Finished) {
+ Send(Worker, new TEvWorker::TEvPoll());
+ } else {
+ Send(Worker, new TEvWorker::TEvGone(TEvWorker::TEvGone::DONE));
+ }
+ }
+
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::REPLICATION_S3_WRITER;
+ }
+
+ explicit TS3Writer(
+ NWrappers::IExternalStorageConfig::TPtr&& s3Settings,
+ const TString& tableName,
+ const TString& writerName)
+ : TActor(&TThis::StateWork)
+ , ExternalStorageConfig(std::move(s3Settings))
+ , TableName(tableName)
+ , WriterName(writerName)
+ {}
+
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvWorker::TEvHandshake, Handle);
+ hFunc(TEvWorker::TEvData, Handle);
+ hFunc(TEvExternalStorage::TEvPutObjectResponse, Handle);
+ sFunc(TEvents::TEvWakeup, SendS3Request);
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
+private:
+ NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig;
+ mutable TMaybe<TString> LogPrefix;
+ const TString TableName;
+ const TString WriterName;
+ TActorId Worker;
+ TActorId S3Client;
+ bool IdentityWritten = false;
+ bool Finished = false;
+
+ std::unique_ptr<TS3Request> RequestInFlight = nullptr;
+
+ const ui32 Retries = 3;
+ ui32 Attempt = 0;
+
+ TDuration Delay = TDuration::Minutes(1);
+ static constexpr TDuration MaxDelay = TDuration::Minutes(10);
+}; // TS3Writer
+
+IActor* CreateS3Writer(NWrappers::IExternalStorageConfig::TPtr&& s3Settings, const TString& tableName, const TString& writerName) {
+ return new TS3Writer(std::move(s3Settings), tableName, writerName);
+}
+
+}
diff --git a/ydb/core/tx/replication/service/s3_writer.h b/ydb/core/tx/replication/service/s3_writer.h
new file mode 100644
index 00000000000..8e5ee121935
--- /dev/null
+++ b/ydb/core/tx/replication/service/s3_writer.h
@@ -0,0 +1,12 @@
+#pragma once
+
+#include <ydb/core/base/defs.h>
+#include <ydb/core/wrappers/s3_storage_config.h>
+
+#include <util/generic/string.h>
+
+namespace NKikimr::NReplication::NService {
+
+IActor* CreateS3Writer(NWrappers::IExternalStorageConfig::TPtr&& s3Settings, const TString& tableName, const TString& writerName);
+
+}
diff --git a/ydb/core/tx/replication/service/s3_writer_ut.cpp b/ydb/core/tx/replication/service/s3_writer_ut.cpp
new file mode 100644
index 00000000000..bb1a075b06e
--- /dev/null
+++ b/ydb/core/tx/replication/service/s3_writer_ut.cpp
@@ -0,0 +1,93 @@
+#include "s3_writer.h"
+#include "worker.h"
+
+#include <ydb/core/tx/replication/ut_helpers/test_env.h>
+#include <ydb/core/tx/replication/ut_helpers/test_table.h>
+#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
+#include <ydb/core/wrappers/s3_wrapper.h>
+#include <ydb/core/wrappers/s3_storage_config.h>
+
+#include <library/cpp/string_utils/base64/base64.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/string/printf.h>
+
+namespace NKikimr::NReplication::NService {
+
+Y_UNIT_TEST_SUITE(S3Writer) {
+ using namespace NTestHelpers;
+
+ Y_UNIT_TEST(WriteTableS3) {
+ using namespace NWrappers::NTestHelpers;
+ TPortManager portManager;
+ const ui16 port = portManager.GetPort();
+
+ TS3Mock s3Mock({}, TS3Mock::TSettings(port));
+ UNIT_ASSERT(s3Mock.Start());
+
+ TString settings = Sprintf(R"(
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ bucket: "TEST"
+ items {
+ source_path: "/MyRoot/Table"
+ destination_prefix: ""
+ }
+ )", port);
+ Ydb::Export::ExportToS3Settings request;
+ UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(settings, &request));
+
+ auto config = std::make_shared<NWrappers::NExternalStorage::TS3ExternalStorageConfig>(request);
+
+ TEnv env;
+ env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG);
+
+ env.CreateTable("/Root", *MakeTableDescription(TTestTableDescription{
+ .Name = "Table",
+ .KeyColumns = {"key"},
+ .Columns = {
+ {.Name = "key", .Type = "Uint32"},
+ {.Name = "value", .Type = "Utf8"},
+ },
+ }));
+
+ TString writerUuid = "AtufpxzetsqaVnEuozdXpD"; // basically base58-encoded uuid4
+
+ auto writer = env.GetRuntime().Register(CreateS3Writer(config, "/MyRoot/Table", writerUuid));
+ env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());
+
+ UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().at("/TEST/writer.AtufpxzetsqaVnEuozdXpD.json"),
+ R"({"finished":false,"table_name":"/MyRoot/Table","writer_name":"AtufpxzetsqaVnEuozdXpD"})");
+
+ using TRecord = TEvWorker::TEvData::TRecord;
+ env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData({
+ TRecord(1, R"({"key":[1], "update":{"value":"10"}})"),
+ TRecord(2, R"({"key":[2], "update":{"value":"20"}})"),
+ TRecord(3, R"({"key":[3], "update":{"value":"30"}})"),
+ }));
+
+ UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().at("/TEST/writer.AtufpxzetsqaVnEuozdXpD.json"),
+ R"({"finished":false,"table_name":"/MyRoot/Table","writer_name":"AtufpxzetsqaVnEuozdXpD"})");
+ UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().at("/TEST/part.1.AtufpxzetsqaVnEuozdXpD.jsonl"),
+ R"({"key":[1], "update":{"value":"10"}})" "\n"
+ R"({"key":[2], "update":{"value":"20"}})" "\n"
+ R"({"key":[3], "update":{"value":"30"}})" "\n");
+
+ auto res = env.Send<TEvWorker::TEvGone>(writer, new TEvWorker::TEvData({}));
+
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, TEvWorker::TEvGone::DONE);
+ UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().at("/TEST/writer.AtufpxzetsqaVnEuozdXpD.json"),
+ R"({"finished":true,"table_name":"/MyRoot/Table","writer_name":"AtufpxzetsqaVnEuozdXpD"})");
+ UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().at("/TEST/part.1.AtufpxzetsqaVnEuozdXpD.jsonl"),
+ R"({"key":[1], "update":{"value":"10"}})" "\n"
+ R"({"key":[2], "update":{"value":"20"}})" "\n"
+ R"({"key":[3], "update":{"value":"30"}})" "\n");
+ }
+
+ // TODO test all retry behavior
+}
+
+}
diff --git a/ydb/core/tx/replication/service/ut_s3_writer/ya.make b/ydb/core/tx/replication/service/ut_s3_writer/ya.make
new file mode 100644
index 00000000000..b15903f35c2
--- /dev/null
+++ b/ydb/core/tx/replication/service/ut_s3_writer/ya.make
@@ -0,0 +1,21 @@
+UNITTEST_FOR(ydb/core/tx/replication/service)
+
+FORK_SUBTESTS()
+
+SIZE(MEDIUM)
+
+TIMEOUT(600)
+
+PEERDIR(
+ ydb/core/tx/replication/ut_helpers
+ library/cpp/string_utils/base64
+ library/cpp/testing/unittest
+)
+
+SRCS(
+ s3_writer_ut.cpp
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h
index 523214190ac..f89f2571729 100644
--- a/ydb/core/tx/replication/service/worker.h
+++ b/ydb/core/tx/replication/service/worker.h
@@ -45,6 +45,8 @@ struct TEvWorker {
struct TEvGone: public TEventLocal<TEvGone, EvGone> {
enum EStatus {
+ DONE,
+ S3_ERROR,
SCHEME_ERROR,
UNAVAILABLE,
};
diff --git a/ydb/core/tx/replication/service/ya.make b/ydb/core/tx/replication/service/ya.make
index cae33d0cdbb..dda4ba4a11f 100644
--- a/ydb/core/tx/replication/service/ya.make
+++ b/ydb/core/tx/replication/service/ya.make
@@ -12,6 +12,7 @@ PEERDIR(
ydb/core/tx/replication/ydb_proxy
ydb/library/actors/core
ydb/library/services
+ ydb/core/wrappers
library/cpp/json
)
@@ -19,6 +20,7 @@ SRCS(
json_change_record.cpp
service.cpp
table_writer.cpp
+ s3_writer.cpp
topic_reader.cpp
worker.cpp
)
@@ -30,6 +32,7 @@ YQL_LAST_ABI_VERSION()
END()
RECURSE_FOR_TESTS(
+ ut_s3_writer
ut_table_writer
ut_topic_reader
ut_worker
diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto
index 14d26f434c7..aeccf0da381 100644
--- a/ydb/library/services/services.proto
+++ b/ydb/library/services/services.proto
@@ -288,6 +288,7 @@ enum EServiceKikimr {
DATASHARD_RESTORE = 801;
IMPORT = 802;
S3_WRAPPER = 803;
+ CONTINUOUS_BACKUP = 804;
// System views
SYSTEM_VIEWS = 900;
@@ -1025,5 +1026,6 @@ message TActivity {
PERSQUEUE_ACCOUNT_WRITE_QUOTER = 631;
PERSQUEUE_WRITE_QUOTER = 632;
REPLICATION_CONTROLLER_TABLE_WORKER_REGISTAR = 633;
+ REPLICATION_S3_WRITER = 634;
};
};