diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2024-02-16 19:02:04 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-16 19:02:04 +0300 |
commit | d71b454b2d5bd3a833519956dd5c218dbb2d2b5e (patch) | |
tree | d911e4bbd891f5c0f55132613cb7ddde2ca5f318 | |
parent | 5e1e046209a6c2b6e63ed498b1f82e61079e227b (diff) | |
download | ydb-d71b454b2d5bd3a833519956dd5c218dbb2d2b5e.tar.gz |
Auto-commit offsets KIKIMR-21006 (#2029)
-rw-r--r-- | ydb/core/tx/replication/service/topic_reader.cpp | 41 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/topic_reader.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/topic_reader_ut.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp | 17 | ||||
-rw-r--r-- | ydb/core/tx/replication/ydb_proxy/ydb_proxy.h | 23 | ||||
-rw-r--r-- | ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp | 31 |
6 files changed, 61 insertions, 65 deletions
diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp index b6eb8155b3e..217af89bb08 100644 --- a/ydb/core/tx/replication/service/topic_reader.cpp +++ b/ydb/core/tx/replication/service/topic_reader.cpp @@ -13,14 +13,12 @@ namespace NKikimr::NReplication::NService { class TRemoteTopicReader: public TActor<TRemoteTopicReader> { - using TReadSessionSettings = NYdb::NTopic::TReadSessionSettings; - TStringBuf GetLogPrefix() const { if (!LogPrefix) { LogPrefix = TStringBuilder() << "[RemoteTopicReader]" - << "[" << Settings.Topics_[0].Path_ << "]" - << "[" << Settings.Topics_[0].PartitionIds_[0] << "]" + << "[" << Settings.GetBase().Topics_[0].Path_ << "]" + << "[" << Settings.GetBase().Topics_[0].PartitionIds_[0] << "]" << SelfId() << " "; } @@ -50,18 +48,6 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> { Y_ABORT_UNLESS(ReadSession); Send(ReadSession, new TEvYdbProxy::TEvReadTopicRequest()); - - if (CommitOffset) { - LOG_D("Commit offset" - << ": offset# " << CommitOffset); - - Send(YdbProxy, new TEvYdbProxy::TEvCommitOffsetRequest( - Settings.Topics_[0].Path_, - Settings.Topics_[0].PartitionIds_[0], - Settings.ConsumerName_, - CommitOffset, {} - )); - } } void Handle(TEvYdbProxy::TEvReadTopicResponse::TPtr& ev) { @@ -72,23 +58,12 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> { for (auto& msg : result.Messages) { Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW); - Y_DEBUG_ABORT_UNLESS(msg.GetOffset() + 1 > CommitOffset); - CommitOffset = Max(CommitOffset, msg.GetOffset() + 1); records.emplace_back(msg.GetOffset(), std::move(msg.GetData())); } Send(Worker, new TEvWorker::TEvData(std::move(records))); } - void Handle(TEvYdbProxy::TEvCommitOffsetResponse::TPtr& ev) { - LOG_D("Handle " << ev->Get()->ToString()); - - if (!ev->Get()->Result.IsSuccess()) { - LOG_N("Unsuccessful commit offset"); - Leave(TEvWorker::TEvGone::UNAVAILABLE); - } - } - void Handle(TEvYdbProxy::TEvTopicReaderGone::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); switch (ev->Get()->Result.GetStatus()) { @@ -118,13 +93,13 @@ public: return NKikimrServices::TActivity::REPLICATION_REMOTE_TOPIC_READER; } - explicit TRemoteTopicReader(const TActorId& ydbProxy, const TReadSessionSettings& opts) + explicit TRemoteTopicReader(const TActorId& ydbProxy, const TEvYdbProxy::TTopicReaderSettings& opts) : TActor(&TThis::StateWork) , YdbProxy(ydbProxy) , Settings(opts) { - Y_ABORT_UNLESS(Settings.Topics_.size() == 1); - Y_ABORT_UNLESS(Settings.Topics_.at(0).PartitionIds_.size() == 1); + Y_ABORT_UNLESS(Settings.GetBase().Topics_.size() == 1); + Y_ABORT_UNLESS(Settings.GetBase().Topics_.at(0).PartitionIds_.size() == 1); } STFUNC(StateWork) { @@ -133,7 +108,6 @@ public: hFunc(TEvWorker::TEvPoll, Handle); hFunc(TEvYdbProxy::TEvCreateTopicReaderResponse, Handle); hFunc(TEvYdbProxy::TEvReadTopicResponse, Handle); - hFunc(TEvYdbProxy::TEvCommitOffsetResponse, Handle); hFunc(TEvYdbProxy::TEvTopicReaderGone, Handle); sFunc(TEvents::TEvPoison, PassAway); } @@ -141,16 +115,15 @@ public: private: const TActorId YdbProxy; - const TReadSessionSettings Settings; + const TEvYdbProxy::TTopicReaderSettings Settings; mutable TMaybe<TString> LogPrefix; TActorId Worker; TActorId ReadSession; - ui64 CommitOffset = 0; }; // TRemoteTopicReader -IActor* CreateRemoteTopicReader(const TActorId& ydbProxy, const NYdb::NTopic::TReadSessionSettings& opts) { +IActor* CreateRemoteTopicReader(const TActorId& ydbProxy, const TEvYdbProxy::TTopicReaderSettings& opts) { return new TRemoteTopicReader(ydbProxy, opts); } diff --git a/ydb/core/tx/replication/service/topic_reader.h b/ydb/core/tx/replication/service/topic_reader.h index 56a491c2cc5..4e07c0397b2 100644 --- a/ydb/core/tx/replication/service/topic_reader.h +++ b/ydb/core/tx/replication/service/topic_reader.h @@ -1,13 +1,9 @@ #pragma once -#include <ydb/core/base/defs.h> - -namespace NYdb::NTopic { - struct TReadSessionSettings; -} +#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h> namespace NKikimr::NReplication::NService { -IActor* CreateRemoteTopicReader(const TActorId& ydbProxy, const NYdb::NTopic::TReadSessionSettings& opts); +IActor* CreateRemoteTopicReader(const TActorId& ydbProxy, const TEvYdbProxy::TTopicReaderSettings& opts); } diff --git a/ydb/core/tx/replication/service/topic_reader_ut.cpp b/ydb/core/tx/replication/service/topic_reader_ut.cpp index ba0b624c138..1b66e29e0d4 100644 --- a/ydb/core/tx/replication/service/topic_reader_ut.cpp +++ b/ydb/core/tx/replication/service/topic_reader_ut.cpp @@ -12,7 +12,7 @@ namespace NKikimr::NReplication::NService { Y_UNIT_TEST_SUITE(RemoteTopicReader) { template <typename Env> - TActorId CreateReader(Env& env, const NYdb::NTopic::TReadSessionSettings& settings) { + TActorId CreateReader(Env& env, const TEvYdbProxy::TTopicReaderSettings& settings) { auto reader = env.GetRuntime().Register(CreateRemoteTopicReader(env.GetYdbProxy(), settings)); env.SendAsync(reader, new TEvWorker::TEvHandshake()); @@ -36,7 +36,7 @@ Y_UNIT_TEST_SUITE(RemoteTopicReader) { } template <typename Env> - auto ReadData(Env& env, TActorId& reader, const NYdb::NTopic::TReadSessionSettings& settings) { + auto ReadData(Env& env, TActorId& reader, const TEvYdbProxy::TTopicReaderSettings& settings) { reader = CreateReader(env, settings); env.SendAsync(reader, new TEvWorker::TEvPoll()); @@ -74,7 +74,7 @@ Y_UNIT_TEST_SUITE(RemoteTopicReader) { UNIT_ASSERT(ev->Get()->Result.IsSuccess()); } - auto settings = NYdb::NTopic::TReadSessionSettings() + auto settings = TEvYdbProxy::TTopicReaderSettings() .ConsumerName("consumer") .AppendTopics(NYdb::NTopic::TTopicReadSettings() .Path("/Root/topic") diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp index 25778421658..dad3284467a 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp @@ -175,6 +175,9 @@ private: class TTopicReader: public TBaseProxyActor<TTopicReader> { void Handle(TEvYdbProxy::TEvReadTopicRequest::TPtr& ev) { + if (AutoCommit) { + DeferredCommit.Commit(); + } WaitEvent(ev->Sender, ev->Cookie); } @@ -200,6 +203,9 @@ class TTopicReader: public TBaseProxyActor<TTopicReader> { x->Confirm(); return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie); } else if (auto* x = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*event)) { + if (AutoCommit) { + DeferredCommit.Add(*x); + } return (void)Send(ev->Get()->Sender, new TEvYdbProxy::TEvReadTopicResponse(*x), 0, ev->Get()->Cookie); } else if (std::get_if<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&*event)) { return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie); @@ -226,9 +232,10 @@ class TTopicReader: public TBaseProxyActor<TTopicReader> { } public: - explicit TTopicReader(const std::shared_ptr<IReadSession>& session) + explicit TTopicReader(const std::shared_ptr<IReadSession>& session, bool autoCommit) : TBaseProxyActor(&TThis::StateWork) , Session(session) + , AutoCommit(autoCommit) { } @@ -244,6 +251,8 @@ public: private: std::shared_ptr<IReadSession> Session; + const bool AutoCommit; + TDeferredCommit DeferredCommit; }; // TTopicReader @@ -394,8 +403,10 @@ class TYdbProxy: public TBaseProxyActor<TYdbProxy> { void Handle(TEvYdbProxy::TEvCreateTopicReaderRequest::TPtr& ev) { auto* client = EnsureClient<TTopicClient>(); auto args = std::move(ev->Get()->GetArgs()); - auto session = std::apply(&TTopicClient::CreateReadSession, std::tuple_cat(std::tie(client), std::move(args))); - Send(ev->Sender, new TEvYdbProxy::TEvCreateTopicReaderResponse(RegisterWithSameMailbox(new TTopicReader(session)))); + const auto& settings = std::get<TEvYdbProxy::TTopicReaderSettings>(args); + auto session = std::invoke(&TTopicClient::CreateReadSession, client, settings.GetBase()); + auto reader = RegisterWithSameMailbox(new TTopicReader(session, settings.AutoCommit_)); + Send(ev->Sender, new TEvYdbProxy::TEvCreateTopicReaderResponse(reader)); } void Handle(TEvYdbProxy::TEvCommitOffsetRequest::TPtr& ev) { diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h index bfb61295c13..4e7ab911a8f 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h @@ -124,6 +124,27 @@ struct TEvYdbProxy { using TBase::TBase; }; + struct TTopicReaderSettings: private NYdb::NTopic::TReadSessionSettings { + using TSelf = TTopicReaderSettings; + using TBase = NYdb::NTopic::TReadSessionSettings; + + FLUENT_SETTING_DEFAULT(bool, AutoCommit, true); + + template <typename... Args> + TSelf& ConsumerName(Args&&... args) { + return static_cast<TSelf&>(TBase::ConsumerName(std::forward<Args>(args)...)); + } + + template <typename... Args> + TSelf& AppendTopics(Args&&... args) { + return static_cast<TSelf&>(TBase::AppendTopics(std::forward<Args>(args)...)); + } + + const TBase& GetBase() const { + return *this; + } + }; + struct TReadTopicResult { class TMessage { using TDataEvent = NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent; @@ -212,7 +233,7 @@ struct TEvYdbProxy { DEFINE_GENERIC_REQUEST_RESPONSE(DropTopic, NYdb::TStatus, TString, NYdb::NTopic::TDropTopicSettings); DEFINE_GENERIC_REQUEST_RESPONSE(DescribeTopic, NYdb::NTopic::TDescribeTopicResult, TString, NYdb::NTopic::TDescribeTopicSettings); DEFINE_GENERIC_REQUEST_RESPONSE(DescribeConsumer, NYdb::NTopic::TDescribeConsumerResult, TString, TString, NYdb::NTopic::TDescribeConsumerSettings); - DEFINE_GENERIC_REQUEST_RESPONSE(CreateTopicReader, TActorId, NYdb::NTopic::TReadSessionSettings); + DEFINE_GENERIC_REQUEST_RESPONSE(CreateTopicReader, TActorId, TTopicReaderSettings); DEFINE_GENERIC_REQUEST_RESPONSE(ReadTopic, TReadTopicResult, void); DEFINE_GENERIC_REQUEST_RESPONSE(CommitOffset, NYdb::TStatus, TString, ui64, TString, ui64, NYdb::NTopic::TCommitOffsetSettings); diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp index c7c811a63b9..6b5a749ef4a 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp @@ -6,6 +6,8 @@ #include <library/cpp/testing/unittest/registar.h> +#include <util/string/printf.h> + namespace NKikimr::NReplication { Y_UNIT_TEST_SUITE(YdbProxyTests) { @@ -569,7 +571,7 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { template <typename Env> TActorId CreateTopicReader(Env& env, const TString& topicPath) { - auto settings = NYdb::NTopic::TReadSessionSettings() + auto settings = TEvYdbProxy::TTopicReaderSettings() .ConsumerName("consumer") .AppendTopics(NYdb::NTopic::TTopicReadSettings(topicPath)); @@ -621,19 +623,14 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { TActorId reader = CreateTopicReader(env, "/Root/topic"); - UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-1")); + UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-0")); { auto data = ReadTopicData(env, reader, "/Root/topic"); UNIT_ASSERT_VALUES_EQUAL(data.Messages.size(), 1); const auto& msg = data.Messages.at(0); UNIT_ASSERT_VALUES_EQUAL(msg.GetOffset(), 0); - UNIT_ASSERT_VALUES_EQUAL(msg.GetData(), "message-1"); - - auto ev = env.Send<TEvYdbProxy::TEvCommitOffsetResponse>( - new TEvYdbProxy::TEvCommitOffsetRequest("/Root/topic", 0, "consumer", msg.GetOffset() + 1, {})); - UNIT_ASSERT(ev); - UNIT_ASSERT(ev->Get()->Result.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(msg.GetData(), "message-0"); } // wait next event @@ -654,19 +651,17 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { env.SendAsync(reader, new TEvents::TEvPoison()); } - UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-2")); + UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-1")); { auto data = ReadTopicData(env, newReader, "/Root/topic"); - UNIT_ASSERT_VALUES_EQUAL(data.Messages.size(), 1); - - const auto& msg = data.Messages.at(0); - UNIT_ASSERT_VALUES_EQUAL(msg.GetOffset(), 1); - UNIT_ASSERT_VALUES_EQUAL(msg.GetData(), "message-2"); + UNIT_ASSERT(data.Messages.size() >= 1); - auto ev = env.Send<TEvYdbProxy::TEvCommitOffsetResponse>( - new TEvYdbProxy::TEvCommitOffsetRequest("/Root/topic", 0, "consumer", msg.GetOffset() + 1, {})); - UNIT_ASSERT(ev); - UNIT_ASSERT(ev->Get()->Result.IsSuccess()); + for (int i = data.Messages.size() - 1; i >= 0; --i) { + const auto offset = i + int(data.Messages.size() == 1); + const auto& msg = data.Messages.at(i); + UNIT_ASSERT_VALUES_EQUAL(msg.GetOffset(), offset); + UNIT_ASSERT_VALUES_EQUAL(msg.GetData(), Sprintf("message-%i", offset)); + } } } |