aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2024-02-16 19:02:04 +0300
committerGitHub <noreply@github.com>2024-02-16 19:02:04 +0300
commitd71b454b2d5bd3a833519956dd5c218dbb2d2b5e (patch)
treed911e4bbd891f5c0f55132613cb7ddde2ca5f318
parent5e1e046209a6c2b6e63ed498b1f82e61079e227b (diff)
downloadydb-d71b454b2d5bd3a833519956dd5c218dbb2d2b5e.tar.gz
Auto-commit offsets KIKIMR-21006 (#2029)
-rw-r--r--ydb/core/tx/replication/service/topic_reader.cpp41
-rw-r--r--ydb/core/tx/replication/service/topic_reader.h8
-rw-r--r--ydb/core/tx/replication/service/topic_reader_ut.cpp6
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp17
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ydb_proxy.h23
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp31
6 files changed, 61 insertions, 65 deletions
diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp
index b6eb8155b3e..217af89bb08 100644
--- a/ydb/core/tx/replication/service/topic_reader.cpp
+++ b/ydb/core/tx/replication/service/topic_reader.cpp
@@ -13,14 +13,12 @@
namespace NKikimr::NReplication::NService {
class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
- using TReadSessionSettings = NYdb::NTopic::TReadSessionSettings;
-
TStringBuf GetLogPrefix() const {
if (!LogPrefix) {
LogPrefix = TStringBuilder()
<< "[RemoteTopicReader]"
- << "[" << Settings.Topics_[0].Path_ << "]"
- << "[" << Settings.Topics_[0].PartitionIds_[0] << "]"
+ << "[" << Settings.GetBase().Topics_[0].Path_ << "]"
+ << "[" << Settings.GetBase().Topics_[0].PartitionIds_[0] << "]"
<< SelfId() << " ";
}
@@ -50,18 +48,6 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
Y_ABORT_UNLESS(ReadSession);
Send(ReadSession, new TEvYdbProxy::TEvReadTopicRequest());
-
- if (CommitOffset) {
- LOG_D("Commit offset"
- << ": offset# " << CommitOffset);
-
- Send(YdbProxy, new TEvYdbProxy::TEvCommitOffsetRequest(
- Settings.Topics_[0].Path_,
- Settings.Topics_[0].PartitionIds_[0],
- Settings.ConsumerName_,
- CommitOffset, {}
- ));
- }
}
void Handle(TEvYdbProxy::TEvReadTopicResponse::TPtr& ev) {
@@ -72,23 +58,12 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
for (auto& msg : result.Messages) {
Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW);
- Y_DEBUG_ABORT_UNLESS(msg.GetOffset() + 1 > CommitOffset);
- CommitOffset = Max(CommitOffset, msg.GetOffset() + 1);
records.emplace_back(msg.GetOffset(), std::move(msg.GetData()));
}
Send(Worker, new TEvWorker::TEvData(std::move(records)));
}
- void Handle(TEvYdbProxy::TEvCommitOffsetResponse::TPtr& ev) {
- LOG_D("Handle " << ev->Get()->ToString());
-
- if (!ev->Get()->Result.IsSuccess()) {
- LOG_N("Unsuccessful commit offset");
- Leave(TEvWorker::TEvGone::UNAVAILABLE);
- }
- }
-
void Handle(TEvYdbProxy::TEvTopicReaderGone::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
switch (ev->Get()->Result.GetStatus()) {
@@ -118,13 +93,13 @@ public:
return NKikimrServices::TActivity::REPLICATION_REMOTE_TOPIC_READER;
}
- explicit TRemoteTopicReader(const TActorId& ydbProxy, const TReadSessionSettings& opts)
+ explicit TRemoteTopicReader(const TActorId& ydbProxy, const TEvYdbProxy::TTopicReaderSettings& opts)
: TActor(&TThis::StateWork)
, YdbProxy(ydbProxy)
, Settings(opts)
{
- Y_ABORT_UNLESS(Settings.Topics_.size() == 1);
- Y_ABORT_UNLESS(Settings.Topics_.at(0).PartitionIds_.size() == 1);
+ Y_ABORT_UNLESS(Settings.GetBase().Topics_.size() == 1);
+ Y_ABORT_UNLESS(Settings.GetBase().Topics_.at(0).PartitionIds_.size() == 1);
}
STFUNC(StateWork) {
@@ -133,7 +108,6 @@ public:
hFunc(TEvWorker::TEvPoll, Handle);
hFunc(TEvYdbProxy::TEvCreateTopicReaderResponse, Handle);
hFunc(TEvYdbProxy::TEvReadTopicResponse, Handle);
- hFunc(TEvYdbProxy::TEvCommitOffsetResponse, Handle);
hFunc(TEvYdbProxy::TEvTopicReaderGone, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
@@ -141,16 +115,15 @@ public:
private:
const TActorId YdbProxy;
- const TReadSessionSettings Settings;
+ const TEvYdbProxy::TTopicReaderSettings Settings;
mutable TMaybe<TString> LogPrefix;
TActorId Worker;
TActorId ReadSession;
- ui64 CommitOffset = 0;
}; // TRemoteTopicReader
-IActor* CreateRemoteTopicReader(const TActorId& ydbProxy, const NYdb::NTopic::TReadSessionSettings& opts) {
+IActor* CreateRemoteTopicReader(const TActorId& ydbProxy, const TEvYdbProxy::TTopicReaderSettings& opts) {
return new TRemoteTopicReader(ydbProxy, opts);
}
diff --git a/ydb/core/tx/replication/service/topic_reader.h b/ydb/core/tx/replication/service/topic_reader.h
index 56a491c2cc5..4e07c0397b2 100644
--- a/ydb/core/tx/replication/service/topic_reader.h
+++ b/ydb/core/tx/replication/service/topic_reader.h
@@ -1,13 +1,9 @@
#pragma once
-#include <ydb/core/base/defs.h>
-
-namespace NYdb::NTopic {
- struct TReadSessionSettings;
-}
+#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
namespace NKikimr::NReplication::NService {
-IActor* CreateRemoteTopicReader(const TActorId& ydbProxy, const NYdb::NTopic::TReadSessionSettings& opts);
+IActor* CreateRemoteTopicReader(const TActorId& ydbProxy, const TEvYdbProxy::TTopicReaderSettings& opts);
}
diff --git a/ydb/core/tx/replication/service/topic_reader_ut.cpp b/ydb/core/tx/replication/service/topic_reader_ut.cpp
index ba0b624c138..1b66e29e0d4 100644
--- a/ydb/core/tx/replication/service/topic_reader_ut.cpp
+++ b/ydb/core/tx/replication/service/topic_reader_ut.cpp
@@ -12,7 +12,7 @@ namespace NKikimr::NReplication::NService {
Y_UNIT_TEST_SUITE(RemoteTopicReader) {
template <typename Env>
- TActorId CreateReader(Env& env, const NYdb::NTopic::TReadSessionSettings& settings) {
+ TActorId CreateReader(Env& env, const TEvYdbProxy::TTopicReaderSettings& settings) {
auto reader = env.GetRuntime().Register(CreateRemoteTopicReader(env.GetYdbProxy(), settings));
env.SendAsync(reader, new TEvWorker::TEvHandshake());
@@ -36,7 +36,7 @@ Y_UNIT_TEST_SUITE(RemoteTopicReader) {
}
template <typename Env>
- auto ReadData(Env& env, TActorId& reader, const NYdb::NTopic::TReadSessionSettings& settings) {
+ auto ReadData(Env& env, TActorId& reader, const TEvYdbProxy::TTopicReaderSettings& settings) {
reader = CreateReader(env, settings);
env.SendAsync(reader, new TEvWorker::TEvPoll());
@@ -74,7 +74,7 @@ Y_UNIT_TEST_SUITE(RemoteTopicReader) {
UNIT_ASSERT(ev->Get()->Result.IsSuccess());
}
- auto settings = NYdb::NTopic::TReadSessionSettings()
+ auto settings = TEvYdbProxy::TTopicReaderSettings()
.ConsumerName("consumer")
.AppendTopics(NYdb::NTopic::TTopicReadSettings()
.Path("/Root/topic")
diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
index 25778421658..dad3284467a 100644
--- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
+++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
@@ -175,6 +175,9 @@ private:
class TTopicReader: public TBaseProxyActor<TTopicReader> {
void Handle(TEvYdbProxy::TEvReadTopicRequest::TPtr& ev) {
+ if (AutoCommit) {
+ DeferredCommit.Commit();
+ }
WaitEvent(ev->Sender, ev->Cookie);
}
@@ -200,6 +203,9 @@ class TTopicReader: public TBaseProxyActor<TTopicReader> {
x->Confirm();
return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie);
} else if (auto* x = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*event)) {
+ if (AutoCommit) {
+ DeferredCommit.Add(*x);
+ }
return (void)Send(ev->Get()->Sender, new TEvYdbProxy::TEvReadTopicResponse(*x), 0, ev->Get()->Cookie);
} else if (std::get_if<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&*event)) {
return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie);
@@ -226,9 +232,10 @@ class TTopicReader: public TBaseProxyActor<TTopicReader> {
}
public:
- explicit TTopicReader(const std::shared_ptr<IReadSession>& session)
+ explicit TTopicReader(const std::shared_ptr<IReadSession>& session, bool autoCommit)
: TBaseProxyActor(&TThis::StateWork)
, Session(session)
+ , AutoCommit(autoCommit)
{
}
@@ -244,6 +251,8 @@ public:
private:
std::shared_ptr<IReadSession> Session;
+ const bool AutoCommit;
+ TDeferredCommit DeferredCommit;
}; // TTopicReader
@@ -394,8 +403,10 @@ class TYdbProxy: public TBaseProxyActor<TYdbProxy> {
void Handle(TEvYdbProxy::TEvCreateTopicReaderRequest::TPtr& ev) {
auto* client = EnsureClient<TTopicClient>();
auto args = std::move(ev->Get()->GetArgs());
- auto session = std::apply(&TTopicClient::CreateReadSession, std::tuple_cat(std::tie(client), std::move(args)));
- Send(ev->Sender, new TEvYdbProxy::TEvCreateTopicReaderResponse(RegisterWithSameMailbox(new TTopicReader(session))));
+ const auto& settings = std::get<TEvYdbProxy::TTopicReaderSettings>(args);
+ auto session = std::invoke(&TTopicClient::CreateReadSession, client, settings.GetBase());
+ auto reader = RegisterWithSameMailbox(new TTopicReader(session, settings.AutoCommit_));
+ Send(ev->Sender, new TEvYdbProxy::TEvCreateTopicReaderResponse(reader));
}
void Handle(TEvYdbProxy::TEvCommitOffsetRequest::TPtr& ev) {
diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
index bfb61295c13..4e7ab911a8f 100644
--- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
+++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
@@ -124,6 +124,27 @@ struct TEvYdbProxy {
using TBase::TBase;
};
+ struct TTopicReaderSettings: private NYdb::NTopic::TReadSessionSettings {
+ using TSelf = TTopicReaderSettings;
+ using TBase = NYdb::NTopic::TReadSessionSettings;
+
+ FLUENT_SETTING_DEFAULT(bool, AutoCommit, true);
+
+ template <typename... Args>
+ TSelf& ConsumerName(Args&&... args) {
+ return static_cast<TSelf&>(TBase::ConsumerName(std::forward<Args>(args)...));
+ }
+
+ template <typename... Args>
+ TSelf& AppendTopics(Args&&... args) {
+ return static_cast<TSelf&>(TBase::AppendTopics(std::forward<Args>(args)...));
+ }
+
+ const TBase& GetBase() const {
+ return *this;
+ }
+ };
+
struct TReadTopicResult {
class TMessage {
using TDataEvent = NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent;
@@ -212,7 +233,7 @@ struct TEvYdbProxy {
DEFINE_GENERIC_REQUEST_RESPONSE(DropTopic, NYdb::TStatus, TString, NYdb::NTopic::TDropTopicSettings);
DEFINE_GENERIC_REQUEST_RESPONSE(DescribeTopic, NYdb::NTopic::TDescribeTopicResult, TString, NYdb::NTopic::TDescribeTopicSettings);
DEFINE_GENERIC_REQUEST_RESPONSE(DescribeConsumer, NYdb::NTopic::TDescribeConsumerResult, TString, TString, NYdb::NTopic::TDescribeConsumerSettings);
- DEFINE_GENERIC_REQUEST_RESPONSE(CreateTopicReader, TActorId, NYdb::NTopic::TReadSessionSettings);
+ DEFINE_GENERIC_REQUEST_RESPONSE(CreateTopicReader, TActorId, TTopicReaderSettings);
DEFINE_GENERIC_REQUEST_RESPONSE(ReadTopic, TReadTopicResult, void);
DEFINE_GENERIC_REQUEST_RESPONSE(CommitOffset, NYdb::TStatus, TString, ui64, TString, ui64, NYdb::NTopic::TCommitOffsetSettings);
diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
index c7c811a63b9..6b5a749ef4a 100644
--- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
+++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
@@ -6,6 +6,8 @@
#include <library/cpp/testing/unittest/registar.h>
+#include <util/string/printf.h>
+
namespace NKikimr::NReplication {
Y_UNIT_TEST_SUITE(YdbProxyTests) {
@@ -569,7 +571,7 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
template <typename Env>
TActorId CreateTopicReader(Env& env, const TString& topicPath) {
- auto settings = NYdb::NTopic::TReadSessionSettings()
+ auto settings = TEvYdbProxy::TTopicReaderSettings()
.ConsumerName("consumer")
.AppendTopics(NYdb::NTopic::TTopicReadSettings(topicPath));
@@ -621,19 +623,14 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
TActorId reader = CreateTopicReader(env, "/Root/topic");
- UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-1"));
+ UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-0"));
{
auto data = ReadTopicData(env, reader, "/Root/topic");
UNIT_ASSERT_VALUES_EQUAL(data.Messages.size(), 1);
const auto& msg = data.Messages.at(0);
UNIT_ASSERT_VALUES_EQUAL(msg.GetOffset(), 0);
- UNIT_ASSERT_VALUES_EQUAL(msg.GetData(), "message-1");
-
- auto ev = env.Send<TEvYdbProxy::TEvCommitOffsetResponse>(
- new TEvYdbProxy::TEvCommitOffsetRequest("/Root/topic", 0, "consumer", msg.GetOffset() + 1, {}));
- UNIT_ASSERT(ev);
- UNIT_ASSERT(ev->Get()->Result.IsSuccess());
+ UNIT_ASSERT_VALUES_EQUAL(msg.GetData(), "message-0");
}
// wait next event
@@ -654,19 +651,17 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
env.SendAsync(reader, new TEvents::TEvPoison());
}
- UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-2"));
+ UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-1"));
{
auto data = ReadTopicData(env, newReader, "/Root/topic");
- UNIT_ASSERT_VALUES_EQUAL(data.Messages.size(), 1);
-
- const auto& msg = data.Messages.at(0);
- UNIT_ASSERT_VALUES_EQUAL(msg.GetOffset(), 1);
- UNIT_ASSERT_VALUES_EQUAL(msg.GetData(), "message-2");
+ UNIT_ASSERT(data.Messages.size() >= 1);
- auto ev = env.Send<TEvYdbProxy::TEvCommitOffsetResponse>(
- new TEvYdbProxy::TEvCommitOffsetRequest("/Root/topic", 0, "consumer", msg.GetOffset() + 1, {}));
- UNIT_ASSERT(ev);
- UNIT_ASSERT(ev->Get()->Result.IsSuccess());
+ for (int i = data.Messages.size() - 1; i >= 0; --i) {
+ const auto offset = i + int(data.Messages.size() == 1);
+ const auto& msg = data.Messages.at(i);
+ UNIT_ASSERT_VALUES_EQUAL(msg.GetOffset(), offset);
+ UNIT_ASSERT_VALUES_EQUAL(msg.GetData(), Sprintf("message-%i", offset));
+ }
}
}