aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2024-01-18 12:59:23 +0300
committerGitHub <noreply@github.com>2024-01-18 12:59:23 +0300
commit404ef8886ecc9736bc58ade6da2fbd83b486a408 (patch)
tree0339d0a6ab6444703d2ed68f7945e5b21a560a2b
parentfa40ee8c90448fa7bab0264d40012edd8f117b29 (diff)
downloadydb-404ef8886ecc9736bc58ade6da2fbd83b486a408.tar.gz
Implemented RemoteTopicReader KIKIMR-20306 (#1093)
-rw-r--r--ydb/core/base/events.h1
-rw-r--r--ydb/core/tx/replication/service/table_writer.cpp47
-rw-r--r--ydb/core/tx/replication/service/table_writer.h9
-rw-r--r--ydb/core/tx/replication/service/topic_reader.cpp110
-rw-r--r--ydb/core/tx/replication/service/topic_reader.h13
-rw-r--r--ydb/core/tx/replication/service/worker.cpp148
-rw-r--r--ydb/core/tx/replication/service/worker.h43
-rw-r--r--ydb/core/tx/replication/service/ya.make5
-rw-r--r--ydb/library/services/services.proto1
9 files changed, 377 insertions, 0 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index d192a702ff..f9bf6ad31f 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -171,6 +171,7 @@ struct TKikimrEvents : TEvents {
ES_TABLE_CREATOR,
ES_PQ_PARTITION_CHOOSER,
ES_GRAPH,
+ ES_REPLICATION_SERVICE,
};
};
diff --git a/ydb/core/tx/replication/service/table_writer.cpp b/ydb/core/tx/replication/service/table_writer.cpp
new file mode 100644
index 0000000000..0ff61d0307
--- /dev/null
+++ b/ydb/core/tx/replication/service/table_writer.cpp
@@ -0,0 +1,47 @@
+#include "table_writer.h"
+#include "worker.h"
+
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/hfunc.h>
+
+namespace NKikimr::NReplication::NService {
+
+class TLocalTableWriter: public TActor<TLocalTableWriter> {
+ void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
+ Worker = ev->Sender;
+ Send(Worker, new TEvWorker::TEvHandshake());
+ }
+
+ void Handle(TEvWorker::TEvData::TPtr& ev) {
+ Worker = ev->Sender;
+ // TODO
+ }
+
+public:
+ explicit TLocalTableWriter(const TString& path)
+ : TActor(&TThis::StateWork)
+ , Path(path)
+ {
+ Y_UNUSED(Path);
+ }
+
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvWorker::TEvHandshake, Handle);
+ hFunc(TEvWorker::TEvData, Handle);
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
+private:
+ const TString Path;
+
+ TActorId Worker;
+
+}; // TLocalTableWriter
+
+IActor* CreateLocalTableWriter(const TString& path) {
+ return new TLocalTableWriter(path);
+}
+
+}
diff --git a/ydb/core/tx/replication/service/table_writer.h b/ydb/core/tx/replication/service/table_writer.h
new file mode 100644
index 0000000000..95ebde89ed
--- /dev/null
+++ b/ydb/core/tx/replication/service/table_writer.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include <ydb/core/base/defs.h>
+
+namespace NKikimr::NReplication::NService {
+
+IActor* CreateLocalTableWriter(const TString& path);
+
+}
diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp
new file mode 100644
index 0000000000..62ccd9ca65
--- /dev/null
+++ b/ydb/core/tx/replication/service/topic_reader.cpp
@@ -0,0 +1,110 @@
+#include "topic_reader.h"
+#include "worker.h"
+
+#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/hfunc.h>
+
+namespace NKikimr::NReplication::NService {
+
+class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
+ using TReadSessionSettings = NYdb::NTopic::TReadSessionSettings;
+
+ void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
+ Worker = ev->Sender;
+
+ Y_ABORT_UNLESS(!ReadSession);
+ Send(YdbProxy, new TEvYdbProxy::TEvCreateTopicReaderRequest(Settings));
+ }
+
+ void Handle(TEvYdbProxy::TEvCreateTopicReaderResponse::TPtr& ev) {
+ ReadSession = ev->Get()->Result;
+
+ Y_ABORT_UNLESS(!Worker);
+ Send(Worker, new TEvWorker::TEvHandshake());
+ }
+
+ void Handle(TEvWorker::TEvPoll::TPtr&) {
+ Y_ABORT_UNLESS(ReadSession);
+ Send(ReadSession, new TEvYdbProxy::TEvReadTopicRequest());
+
+ if (CommitOffset) {
+ Send(YdbProxy, new TEvYdbProxy::TEvCommitOffsetRequest(
+ Settings.Topics_[0].Path_,
+ Settings.Topics_[0].PartitionIds_[0],
+ Settings.ConsumerName_,
+ CommitOffset, {}
+ ));
+ }
+ }
+
+ void Handle(TEvYdbProxy::TEvReadTopicResponse::TPtr& ev) {
+ auto& result = ev->Get()->Result;
+ TVector<TEvWorker::TEvData::TRecord> records(Reserve(result.Messages.size()));
+
+ for (auto& msg : result.Messages) {
+ Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW);
+ Y_DEBUG_ABORT_UNLESS(msg.GetOffset() + 1 > CommitOffset);
+ CommitOffset = Max(CommitOffset, msg.GetOffset() + 1);
+ records.emplace_back(msg.GetOffset(), std::move(msg.GetData()));
+ }
+
+ Send(Worker, new TEvWorker::TEvData(std::move(records)));
+ }
+
+ void Handle(TEvYdbProxy::TEvCommitOffsetResponse::TPtr& ev) {
+ if (!ev->Get()->Result.IsSuccess()) {
+ Leave();
+ }
+ }
+
+ void Leave() {
+ Send(Worker, new TEvents::TEvGone());
+ PassAway();
+ }
+
+ void PassAway() override {
+ if (const auto& actorId = std::exchange(ReadSession, {})) {
+ Send(actorId, new TEvents::TEvPoison());
+ }
+
+ TActor::PassAway();
+ }
+
+public:
+ explicit TRemoteTopicReader(const TActorId& ydbProxy, const TReadSessionSettings& opts)
+ : TActor(&TThis::StateWork)
+ , YdbProxy(ydbProxy)
+ , Settings(opts)
+ {
+ Y_ABORT_UNLESS(Settings.Topics_.size() == 1);
+ Y_ABORT_UNLESS(Settings.Topics_.at(0).PartitionIds_.size() == 1);
+ }
+
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvWorker::TEvHandshake, Handle);
+ hFunc(TEvWorker::TEvPoll, Handle);
+ hFunc(TEvYdbProxy::TEvCreateTopicReaderResponse, Handle);
+ hFunc(TEvYdbProxy::TEvReadTopicResponse, Handle);
+ hFunc(TEvYdbProxy::TEvCommitOffsetResponse, Handle);
+ sFunc(TEvents::TEvGone, Leave);
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
+private:
+ const TActorId YdbProxy;
+ const TReadSessionSettings Settings;
+
+ TActorId Worker;
+ TActorId ReadSession;
+ ui64 CommitOffset = 0;
+
+}; // TRemoteTopicReader
+
+IActor* CreateRemoteTopicReader(const TActorId& ydbProxy, const NYdb::NTopic::TReadSessionSettings& opts) {
+ return new TRemoteTopicReader(ydbProxy, opts);
+}
+
+}
diff --git a/ydb/core/tx/replication/service/topic_reader.h b/ydb/core/tx/replication/service/topic_reader.h
new file mode 100644
index 0000000000..56a491c2cc
--- /dev/null
+++ b/ydb/core/tx/replication/service/topic_reader.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include <ydb/core/base/defs.h>
+
+namespace NYdb::NTopic {
+ struct TReadSessionSettings;
+}
+
+namespace NKikimr::NReplication::NService {
+
+IActor* CreateRemoteTopicReader(const TActorId& ydbProxy, const NYdb::NTopic::TReadSessionSettings& opts);
+
+}
diff --git a/ydb/core/tx/replication/service/worker.cpp b/ydb/core/tx/replication/service/worker.cpp
new file mode 100644
index 0000000000..0881c3c783
--- /dev/null
+++ b/ydb/core/tx/replication/service/worker.cpp
@@ -0,0 +1,148 @@
+#include "worker.h"
+
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+#include <ydb/library/actors/core/hfunc.h>
+#include <ydb/library/services/services.pb.h>
+
+namespace NKikimr::NReplication::NService {
+
+TEvWorker::TEvData::TRecord::TRecord(ui64 offset, const TString& data)
+ : Offset(offset)
+ , Data(data)
+{
+}
+
+TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data)
+ : Offset(offset)
+ , Data(std::move(data))
+{
+}
+
+TEvWorker::TEvData::TEvData(TVector<TRecord>&& records)
+ : Records(std::move(records))
+{
+}
+
+class TWorker: public TActorBootstrapped<TWorker> {
+ struct TActorInfo {
+ THolder<IActor> Actor;
+ TActorId ActorId;
+ bool InitDone;
+
+ explicit TActorInfo(THolder<IActor>&& actor)
+ : Actor(std::move(actor))
+ , InitDone(false)
+ {
+ }
+
+ operator TActorId() const {
+ return ActorId;
+ }
+
+ explicit operator bool() const {
+ return InitDone;
+ }
+ };
+
+ TActorId RegisterActor(TActorInfo& info) {
+ Y_ABORT_UNLESS(info.Actor);
+ info.ActorId = RegisterWithSameMailbox(info.Actor.Release());
+ return info.ActorId;
+ }
+
+ void InitActor(TActorInfo& info) {
+ Y_ABORT_UNLESS(info.ActorId);
+ Send(info.ActorId, new TEvWorker::TEvHandshake());
+ info.InitDone = false;
+ }
+
+ void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
+ if (ev->Sender == Reader) {
+ Reader.InitDone = true;
+ } else if (ev->Sender == Writer) {
+ Writer.InitDone = true;
+ } else {
+ // TODO: log warn
+ }
+
+ if (Reader && Writer) {
+ Send(Reader, new TEvWorker::TEvPoll());
+ }
+ }
+
+ void Handle(TEvWorker::TEvPoll::TPtr& ev) {
+ if (ev->Sender != Writer) {
+ // TODO: log warn
+ return;
+ }
+
+ Send(ev->Forward(Reader));
+ }
+
+ void Handle(TEvWorker::TEvData::TPtr& ev) {
+ if (ev->Sender != Reader) {
+ // TODO: log warn
+ return;
+ }
+
+ Send(ev->Forward(Writer));
+ }
+
+ void Handle(TEvents::TEvGone::TPtr& ev) {
+ if (ev->Sender == Reader) {
+ // TODO
+ } else if (ev->Sender == Writer) {
+ // TODO
+ } else {
+ // TODO: log warn
+ }
+ }
+
+ void PassAway() override {
+ for (auto* actor : {&Reader, &Writer}) {
+ Send(*actor, new TEvents::TEvPoison());
+ }
+
+ TActorBootstrapped::PassAway();
+ }
+
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::REPLICATION_WORKER;
+ }
+
+ explicit TWorker(THolder<IActor>&& reader, THolder<IActor>&& writer)
+ : Reader(std::move(reader))
+ , Writer(std::move(writer))
+ {
+ }
+
+ void Bootstrap() {
+ for (auto* actor : {&Reader, &Writer}) {
+ RegisterActor(*actor);
+ InitActor(*actor);
+ }
+
+ Become(&TThis::StateWork);
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvWorker::TEvHandshake, Handle);
+ hFunc(TEvWorker::TEvPoll, Handle);
+ hFunc(TEvWorker::TEvData, Handle);
+ hFunc(TEvents::TEvGone, Handle);
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
+private:
+ TActorInfo Reader;
+ TActorInfo Writer;
+};
+
+IActor* CreateWorker(THolder<IActor>&& reader, THolder<IActor>&& writer) {
+ return new TWorker(std::move(reader), std::move(writer));
+}
+
+}
diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h
new file mode 100644
index 0000000000..1f244010ae
--- /dev/null
+++ b/ydb/core/tx/replication/service/worker.h
@@ -0,0 +1,43 @@
+#pragma once
+
+#include <ydb/core/base/defs.h>
+#include <ydb/core/base/events.h>
+
+#include <util/generic/vector.h>
+
+namespace NKikimr::NReplication::NService {
+
+struct TEvWorker {
+ enum EEv {
+ EvBegin = EventSpaceBegin(TKikimrEvents::ES_REPLICATION_SERVICE),
+
+ EvHandshake,
+ EvPoll,
+ EvData,
+
+ EvEnd,
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_REPLICATION_SERVICE));
+
+ struct TEvHandshake: public TEventLocal<TEvHandshake, EvHandshake> {};
+ struct TEvPoll: public TEventLocal<TEvPoll, EvPoll> {};
+
+ struct TEvData: public TEventLocal<TEvData, EvData> {
+ struct TRecord {
+ ui64 Offset;
+ TString Data;
+
+ explicit TRecord(ui64 offset, const TString& data);
+ explicit TRecord(ui64 offset, TString&& data);
+ };
+
+ TVector<TRecord> Records;
+
+ explicit TEvData(TVector<TRecord>&& records);
+ };
+};
+
+IActor* CreateWorker(THolder<IActor>&& reader, THolder<IActor>&& writer);
+
+}
diff --git a/ydb/core/tx/replication/service/ya.make b/ydb/core/tx/replication/service/ya.make
index d499493992..9573a41ecb 100644
--- a/ydb/core/tx/replication/service/ya.make
+++ b/ydb/core/tx/replication/service/ya.make
@@ -2,10 +2,15 @@ LIBRARY()
PEERDIR(
ydb/core/base
+ ydb/core/tx/replication/ydb_proxy
+ ydb/library/actors/core
)
SRCS(
service.cpp
+ table_writer.cpp
+ topic_reader.cpp
+ worker.cpp
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto
index 525fdd2519..49d3bcf55c 100644
--- a/ydb/library/services/services.proto
+++ b/ydb/library/services/services.proto
@@ -1015,5 +1015,6 @@ message TActivity {
STATISTICS_AGGREGATOR = 622;
KAFKA_READ_SESSION_ACTOR = 623;
GRAPH_SERVICE = 624;
+ REPLICATION_WORKER = 625;
};
};