diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2024-01-18 12:59:23 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-18 12:59:23 +0300 |
commit | 404ef8886ecc9736bc58ade6da2fbd83b486a408 (patch) | |
tree | 0339d0a6ab6444703d2ed68f7945e5b21a560a2b | |
parent | fa40ee8c90448fa7bab0264d40012edd8f117b29 (diff) | |
download | ydb-404ef8886ecc9736bc58ade6da2fbd83b486a408.tar.gz |
Implemented RemoteTopicReader KIKIMR-20306 (#1093)
-rw-r--r-- | ydb/core/base/events.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/table_writer.cpp | 47 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/table_writer.h | 9 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/topic_reader.cpp | 110 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/topic_reader.h | 13 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/worker.cpp | 148 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/worker.h | 43 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/ya.make | 5 | ||||
-rw-r--r-- | ydb/library/services/services.proto | 1 |
9 files changed, 377 insertions, 0 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index d192a702ff..f9bf6ad31f 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -171,6 +171,7 @@ struct TKikimrEvents : TEvents { ES_TABLE_CREATOR, ES_PQ_PARTITION_CHOOSER, ES_GRAPH, + ES_REPLICATION_SERVICE, }; }; diff --git a/ydb/core/tx/replication/service/table_writer.cpp b/ydb/core/tx/replication/service/table_writer.cpp new file mode 100644 index 0000000000..0ff61d0307 --- /dev/null +++ b/ydb/core/tx/replication/service/table_writer.cpp @@ -0,0 +1,47 @@ +#include "table_writer.h" +#include "worker.h" + +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/core/hfunc.h> + +namespace NKikimr::NReplication::NService { + +class TLocalTableWriter: public TActor<TLocalTableWriter> { + void Handle(TEvWorker::TEvHandshake::TPtr& ev) { + Worker = ev->Sender; + Send(Worker, new TEvWorker::TEvHandshake()); + } + + void Handle(TEvWorker::TEvData::TPtr& ev) { + Worker = ev->Sender; + // TODO + } + +public: + explicit TLocalTableWriter(const TString& path) + : TActor(&TThis::StateWork) + , Path(path) + { + Y_UNUSED(Path); + } + + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvWorker::TEvHandshake, Handle); + hFunc(TEvWorker::TEvData, Handle); + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + const TString Path; + + TActorId Worker; + +}; // TLocalTableWriter + +IActor* CreateLocalTableWriter(const TString& path) { + return new TLocalTableWriter(path); +} + +} diff --git a/ydb/core/tx/replication/service/table_writer.h b/ydb/core/tx/replication/service/table_writer.h new file mode 100644 index 0000000000..95ebde89ed --- /dev/null +++ b/ydb/core/tx/replication/service/table_writer.h @@ -0,0 +1,9 @@ +#pragma once + +#include <ydb/core/base/defs.h> + +namespace NKikimr::NReplication::NService { + +IActor* CreateLocalTableWriter(const TString& path); + +} diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp new file mode 100644 index 0000000000..62ccd9ca65 --- /dev/null +++ b/ydb/core/tx/replication/service/topic_reader.cpp @@ -0,0 +1,110 @@ +#include "topic_reader.h" +#include "worker.h" + +#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h> +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/core/hfunc.h> + +namespace NKikimr::NReplication::NService { + +class TRemoteTopicReader: public TActor<TRemoteTopicReader> { + using TReadSessionSettings = NYdb::NTopic::TReadSessionSettings; + + void Handle(TEvWorker::TEvHandshake::TPtr& ev) { + Worker = ev->Sender; + + Y_ABORT_UNLESS(!ReadSession); + Send(YdbProxy, new TEvYdbProxy::TEvCreateTopicReaderRequest(Settings)); + } + + void Handle(TEvYdbProxy::TEvCreateTopicReaderResponse::TPtr& ev) { + ReadSession = ev->Get()->Result; + + Y_ABORT_UNLESS(!Worker); + Send(Worker, new TEvWorker::TEvHandshake()); + } + + void Handle(TEvWorker::TEvPoll::TPtr&) { + Y_ABORT_UNLESS(ReadSession); + Send(ReadSession, new TEvYdbProxy::TEvReadTopicRequest()); + + if (CommitOffset) { + Send(YdbProxy, new TEvYdbProxy::TEvCommitOffsetRequest( + Settings.Topics_[0].Path_, + Settings.Topics_[0].PartitionIds_[0], + Settings.ConsumerName_, + CommitOffset, {} + )); + } + } + + void Handle(TEvYdbProxy::TEvReadTopicResponse::TPtr& ev) { + auto& result = ev->Get()->Result; + TVector<TEvWorker::TEvData::TRecord> records(Reserve(result.Messages.size())); + + for (auto& msg : result.Messages) { + Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW); + Y_DEBUG_ABORT_UNLESS(msg.GetOffset() + 1 > CommitOffset); + CommitOffset = Max(CommitOffset, msg.GetOffset() + 1); + records.emplace_back(msg.GetOffset(), std::move(msg.GetData())); + } + + Send(Worker, new TEvWorker::TEvData(std::move(records))); + } + + void Handle(TEvYdbProxy::TEvCommitOffsetResponse::TPtr& ev) { + if (!ev->Get()->Result.IsSuccess()) { + Leave(); + } + } + + void Leave() { + Send(Worker, new TEvents::TEvGone()); + PassAway(); + } + + void PassAway() override { + if (const auto& actorId = std::exchange(ReadSession, {})) { + Send(actorId, new TEvents::TEvPoison()); + } + + TActor::PassAway(); + } + +public: + explicit TRemoteTopicReader(const TActorId& ydbProxy, const TReadSessionSettings& opts) + : TActor(&TThis::StateWork) + , YdbProxy(ydbProxy) + , Settings(opts) + { + Y_ABORT_UNLESS(Settings.Topics_.size() == 1); + Y_ABORT_UNLESS(Settings.Topics_.at(0).PartitionIds_.size() == 1); + } + + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvWorker::TEvHandshake, Handle); + hFunc(TEvWorker::TEvPoll, Handle); + hFunc(TEvYdbProxy::TEvCreateTopicReaderResponse, Handle); + hFunc(TEvYdbProxy::TEvReadTopicResponse, Handle); + hFunc(TEvYdbProxy::TEvCommitOffsetResponse, Handle); + sFunc(TEvents::TEvGone, Leave); + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + const TActorId YdbProxy; + const TReadSessionSettings Settings; + + TActorId Worker; + TActorId ReadSession; + ui64 CommitOffset = 0; + +}; // TRemoteTopicReader + +IActor* CreateRemoteTopicReader(const TActorId& ydbProxy, const NYdb::NTopic::TReadSessionSettings& opts) { + return new TRemoteTopicReader(ydbProxy, opts); +} + +} diff --git a/ydb/core/tx/replication/service/topic_reader.h b/ydb/core/tx/replication/service/topic_reader.h new file mode 100644 index 0000000000..56a491c2cc --- /dev/null +++ b/ydb/core/tx/replication/service/topic_reader.h @@ -0,0 +1,13 @@ +#pragma once + +#include <ydb/core/base/defs.h> + +namespace NYdb::NTopic { + struct TReadSessionSettings; +} + +namespace NKikimr::NReplication::NService { + +IActor* CreateRemoteTopicReader(const TActorId& ydbProxy, const NYdb::NTopic::TReadSessionSettings& opts); + +} diff --git a/ydb/core/tx/replication/service/worker.cpp b/ydb/core/tx/replication/service/worker.cpp new file mode 100644 index 0000000000..0881c3c783 --- /dev/null +++ b/ydb/core/tx/replication/service/worker.cpp @@ -0,0 +1,148 @@ +#include "worker.h" + +#include <ydb/library/actors/core/actor_bootstrapped.h> +#include <ydb/library/actors/core/hfunc.h> +#include <ydb/library/services/services.pb.h> + +namespace NKikimr::NReplication::NService { + +TEvWorker::TEvData::TRecord::TRecord(ui64 offset, const TString& data) + : Offset(offset) + , Data(data) +{ +} + +TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data) + : Offset(offset) + , Data(std::move(data)) +{ +} + +TEvWorker::TEvData::TEvData(TVector<TRecord>&& records) + : Records(std::move(records)) +{ +} + +class TWorker: public TActorBootstrapped<TWorker> { + struct TActorInfo { + THolder<IActor> Actor; + TActorId ActorId; + bool InitDone; + + explicit TActorInfo(THolder<IActor>&& actor) + : Actor(std::move(actor)) + , InitDone(false) + { + } + + operator TActorId() const { + return ActorId; + } + + explicit operator bool() const { + return InitDone; + } + }; + + TActorId RegisterActor(TActorInfo& info) { + Y_ABORT_UNLESS(info.Actor); + info.ActorId = RegisterWithSameMailbox(info.Actor.Release()); + return info.ActorId; + } + + void InitActor(TActorInfo& info) { + Y_ABORT_UNLESS(info.ActorId); + Send(info.ActorId, new TEvWorker::TEvHandshake()); + info.InitDone = false; + } + + void Handle(TEvWorker::TEvHandshake::TPtr& ev) { + if (ev->Sender == Reader) { + Reader.InitDone = true; + } else if (ev->Sender == Writer) { + Writer.InitDone = true; + } else { + // TODO: log warn + } + + if (Reader && Writer) { + Send(Reader, new TEvWorker::TEvPoll()); + } + } + + void Handle(TEvWorker::TEvPoll::TPtr& ev) { + if (ev->Sender != Writer) { + // TODO: log warn + return; + } + + Send(ev->Forward(Reader)); + } + + void Handle(TEvWorker::TEvData::TPtr& ev) { + if (ev->Sender != Reader) { + // TODO: log warn + return; + } + + Send(ev->Forward(Writer)); + } + + void Handle(TEvents::TEvGone::TPtr& ev) { + if (ev->Sender == Reader) { + // TODO + } else if (ev->Sender == Writer) { + // TODO + } else { + // TODO: log warn + } + } + + void PassAway() override { + for (auto* actor : {&Reader, &Writer}) { + Send(*actor, new TEvents::TEvPoison()); + } + + TActorBootstrapped::PassAway(); + } + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::REPLICATION_WORKER; + } + + explicit TWorker(THolder<IActor>&& reader, THolder<IActor>&& writer) + : Reader(std::move(reader)) + , Writer(std::move(writer)) + { + } + + void Bootstrap() { + for (auto* actor : {&Reader, &Writer}) { + RegisterActor(*actor); + InitActor(*actor); + } + + Become(&TThis::StateWork); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvWorker::TEvHandshake, Handle); + hFunc(TEvWorker::TEvPoll, Handle); + hFunc(TEvWorker::TEvData, Handle); + hFunc(TEvents::TEvGone, Handle); + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + TActorInfo Reader; + TActorInfo Writer; +}; + +IActor* CreateWorker(THolder<IActor>&& reader, THolder<IActor>&& writer) { + return new TWorker(std::move(reader), std::move(writer)); +} + +} diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h new file mode 100644 index 0000000000..1f244010ae --- /dev/null +++ b/ydb/core/tx/replication/service/worker.h @@ -0,0 +1,43 @@ +#pragma once + +#include <ydb/core/base/defs.h> +#include <ydb/core/base/events.h> + +#include <util/generic/vector.h> + +namespace NKikimr::NReplication::NService { + +struct TEvWorker { + enum EEv { + EvBegin = EventSpaceBegin(TKikimrEvents::ES_REPLICATION_SERVICE), + + EvHandshake, + EvPoll, + EvData, + + EvEnd, + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_REPLICATION_SERVICE)); + + struct TEvHandshake: public TEventLocal<TEvHandshake, EvHandshake> {}; + struct TEvPoll: public TEventLocal<TEvPoll, EvPoll> {}; + + struct TEvData: public TEventLocal<TEvData, EvData> { + struct TRecord { + ui64 Offset; + TString Data; + + explicit TRecord(ui64 offset, const TString& data); + explicit TRecord(ui64 offset, TString&& data); + }; + + TVector<TRecord> Records; + + explicit TEvData(TVector<TRecord>&& records); + }; +}; + +IActor* CreateWorker(THolder<IActor>&& reader, THolder<IActor>&& writer); + +} diff --git a/ydb/core/tx/replication/service/ya.make b/ydb/core/tx/replication/service/ya.make index d499493992..9573a41ecb 100644 --- a/ydb/core/tx/replication/service/ya.make +++ b/ydb/core/tx/replication/service/ya.make @@ -2,10 +2,15 @@ LIBRARY() PEERDIR( ydb/core/base + ydb/core/tx/replication/ydb_proxy + ydb/library/actors/core ) SRCS( service.cpp + table_writer.cpp + topic_reader.cpp + worker.cpp ) YQL_LAST_ABI_VERSION() diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 525fdd2519..49d3bcf55c 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1015,5 +1015,6 @@ message TActivity { STATISTICS_AGGREGATOR = 622; KAFKA_READ_SESSION_ACTOR = 623; GRAPH_SERVICE = 624; + REPLICATION_WORKER = 625; }; }; |