aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2025-04-24 13:27:27 +0500
committerGitHub <noreply@github.com>2025-04-24 13:27:27 +0500
commit0ddb5fd0f317eb1b835bdcf4d73e4ba05029d8db (patch)
treefd5af81c849ae176ea939b30867b6f6ef8dc20c8
parent305b5cdccc9c2f43a8621aacdf120d1ded1b793c (diff)
downloadydb-0ddb5fd0f317eb1b835bdcf4d73e4ba05029d8db.tar.gz
Fixed committing of proccessed messages by transfer (#17580)
-rw-r--r--ydb/core/tx/replication/service/service.cpp7
-rw-r--r--ydb/core/tx/replication/service/topic_reader.cpp39
-rw-r--r--ydb/core/tx/replication/service/worker.cpp26
-rw-r--r--ydb/core/tx/replication/service/worker.h8
-rw-r--r--ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h2
-rw-r--r--ydb/core/tx/replication/ydb_proxy/partition_end_watcher_ut.cpp4
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp7
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ydb_proxy.h22
-rw-r--r--ydb/tests/functional/replication/utils.h102
9 files changed, 176 insertions, 41 deletions
diff --git a/ydb/core/tx/replication/service/service.cpp b/ydb/core/tx/replication/service/service.cpp
index 2d4a7c93110..64cdef9d96b 100644
--- a/ydb/core/tx/replication/service/service.cpp
+++ b/ydb/core/tx/replication/service/service.cpp
@@ -377,7 +377,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
return it->second;
}
- std::function<IActor*(void)> ReaderFn(const NKikimrReplication::TRemoteTopicReaderSettings& settings) {
+ std::function<IActor*(void)> ReaderFn(const NKikimrReplication::TRemoteTopicReaderSettings& settings, bool autoCommit) {
TActorId ydbProxy;
const auto& params = settings.GetConnectionParams();
switch (params.GetCredentialsCase()) {
@@ -394,6 +394,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
auto topicReaderSettings = TEvYdbProxy::TTopicReaderSettings()
.MaxMemoryUsageBytes(1_MB)
.ConsumerName(settings.GetConsumerName())
+ .AutoCommit(autoCommit)
.AppendTopics(NYdb::NTopic::TTopicReadSettings()
.Path(settings.GetTopicPath())
.AppendPartitionIds(settings.GetTopicPartitionId())
@@ -473,6 +474,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
const auto& cmd = record.GetCommand();
// TODO: validate settings
const auto& readerSettings = cmd.GetRemoteTopicReader();
+ bool autoCommit = true;
std::function<IActor*(void)> writerFn;
if (cmd.HasLocalTableWriter()) {
const auto& writerSettings = cmd.GetLocalTableWriter();
@@ -485,12 +487,13 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
LOG_C("Run transfer but TransferWriterFactory does not exists.");
return;
}
+ autoCommit = false;
writerFn = TransferWriterFn(writerSettings, transferWriterFactory);
} else {
Y_ABORT("Unsupported");
}
const auto actorId = session.RegisterWorker(this, id,
- CreateWorker(SelfId(), ReaderFn(readerSettings), std::move(writerFn)));
+ CreateWorker(SelfId(), ReaderFn(readerSettings, autoCommit), std::move(writerFn)));
WorkerActorIdToSession[actorId] = controller.GetTabletId();
}
diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp
index e25c3b708f6..d7fde434a96 100644
--- a/ydb/core/tx/replication/service/topic_reader.cpp
+++ b/ydb/core/tx/replication/service/topic_reader.cpp
@@ -68,13 +68,44 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
Send(Worker, new TEvWorker::TEvData(result.PartitionId, ToString(result.PartitionId), std::move(records)));
}
- void Handle(TEvYdbProxy::TEvTopicEndPartition::TPtr& ev) {
+ void Handle(TEvYdbProxy::TEvEndTopicPartition::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
auto& result = ev->Get()->Result;
Send(Worker, new TEvWorker::TEvDataEnd(result.PartitionId, std::move(result.AdjacentPartitionsIds), std::move(result.ChildPartitionsIds)));
}
+ void Handle(TEvYdbProxy::TEvStartTopicReadingSession::TPtr& ev) {
+ LOG_D("Handle " << ev->Get()->ToString());
+
+ ReadSessionId = ev->Get()->Result.ReadSessionId;
+ }
+
+ void Handle(TEvWorker::TEvCommit::TPtr& ev) {
+ LOG_D("Handle " << ev->Get()->ToString());
+
+ Y_ABORT_UNLESS(YdbProxy);
+ Y_ABORT_UNLESS(ReadSessionId);
+
+ auto settings = NYdb::NTopic::TCommitOffsetSettings()
+ .ReadSessionId(ReadSessionId);
+
+ const auto& topicName = Settings.GetBase().Topics_.at(0).Path_;
+ const auto partitionId = Settings.GetBase().Topics_.at(0).PartitionIds_.at(0);
+ const auto& consumerName = Settings.GetBase().ConsumerName_;
+
+ Send(YdbProxy, new TEvYdbProxy::TEvCommitOffsetRequest(topicName, partitionId, consumerName, ev->Get()->Offset, std::move(settings)));
+ }
+
+ void Handle(TEvYdbProxy::TEvCommitOffsetResponse::TPtr& ev) {
+ if (!ev->Get()->Result.IsSuccess()) {
+ LOG_W("Handle " << ev->Get()->ToString());
+ return Leave(TEvWorker::TEvGone::UNAVAILABLE);
+ } else {
+ LOG_D("Handle " << ev->Get()->ToString());
+ }
+ }
+
void Handle(TEvYdbProxy::TEvTopicReaderGone::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
@@ -121,9 +152,12 @@ public:
switch (ev->GetTypeRewrite()) {
hFunc(TEvWorker::TEvHandshake, Handle);
hFunc(TEvWorker::TEvPoll, Handle);
+ hFunc(TEvWorker::TEvCommit, Handle);
hFunc(TEvYdbProxy::TEvCreateTopicReaderResponse, Handle);
hFunc(TEvYdbProxy::TEvReadTopicResponse, Handle);
- hFunc(TEvYdbProxy::TEvTopicEndPartition, Handle);
+ hFunc(TEvYdbProxy::TEvCommitOffsetResponse, Handle);
+ hFunc(TEvYdbProxy::TEvStartTopicReadingSession, Handle);
+ hFunc(TEvYdbProxy::TEvEndTopicPartition, Handle);
hFunc(TEvYdbProxy::TEvTopicReaderGone, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
@@ -136,6 +170,7 @@ private:
TActorId Worker;
TActorId ReadSession;
+ TString ReadSessionId;
}; // TRemoteTopicReader
diff --git a/ydb/core/tx/replication/service/worker.cpp b/ydb/core/tx/replication/service/worker.cpp
index d66eacf1ce3..de1338b733b 100644
--- a/ydb/core/tx/replication/service/worker.cpp
+++ b/ydb/core/tx/replication/service/worker.cpp
@@ -25,6 +25,17 @@ TString TEvWorker::TEvPoll::ToString() const {
<< " }";
}
+TEvWorker::TEvCommit::TEvCommit(size_t offset)
+ : Offset(offset)
+{
+}
+
+TString TEvWorker::TEvCommit::ToString() const {
+ return TStringBuilder() << ToStringHeader() << " {"
+ << " Offset: " << Offset
+ << " }";
+}
+
TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, const TVector<TTopicMessage>& records)
: PartitionId(partitionId)
, Source(source)
@@ -187,6 +198,20 @@ class TWorker: public TActorBootstrapped<TWorker> {
}
}
+ void Handle(TEvWorker::TEvCommit::TPtr& ev) {
+ LOG_D("Handle " << ev->Get()->ToString());
+
+ if (ev->Sender != Writer) {
+ LOG_W("Commit from unknown actor"
+ << ": sender# " << ev->Sender);
+ return;
+ }
+
+ if (Reader) {
+ Send(ev->Forward(Reader));
+ }
+ }
+
void Handle(TEvWorker::TEvData::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
@@ -308,6 +333,7 @@ public:
switch (ev->GetTypeRewrite()) {
hFunc(TEvWorker::TEvHandshake, Handle);
hFunc(TEvWorker::TEvPoll, Handle);
+ hFunc(TEvWorker::TEvCommit, Handle);
hFunc(TEvWorker::TEvData, Handle);
hFunc(TEvWorker::TEvDataEnd, Forward);
hFunc(TEvWorker::TEvGone, Handle);
diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h
index 1c1dc14133a..11897c9804c 100644
--- a/ydb/core/tx/replication/service/worker.h
+++ b/ydb/core/tx/replication/service/worker.h
@@ -24,6 +24,7 @@ struct TEvWorker {
EvGone,
EvStatus,
EvDataEnd,
+ EvCommit,
EvEnd,
};
@@ -39,6 +40,13 @@ struct TEvWorker {
TString ToString() const override;
};
+ struct TEvCommit: public TEventLocal<TEvCommit, EvCommit> {
+ size_t Offset;
+
+ explicit TEvCommit(size_t offset);
+ TString ToString() const override;
+ };
+
struct TEvData: public TEventLocal<TEvData, EvData> {
ui32 PartitionId;
TString Source;
diff --git a/ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h b/ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h
index d051440f477..180165afe9c 100644
--- a/ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h
+++ b/ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h
@@ -14,7 +14,7 @@ class TPartitionEndWatcher {
return;
}
- ActorOps->Send(client, new TEvYdbProxy::TEvTopicEndPartition(*EndPartitionSessionEvent));
+ ActorOps->Send(client, new TEvYdbProxy::TEvEndTopicPartition(*EndPartitionSessionEvent));
}
public:
diff --git a/ydb/core/tx/replication/ydb_proxy/partition_end_watcher_ut.cpp b/ydb/core/tx/replication/ydb_proxy/partition_end_watcher_ut.cpp
index f33fa4716ff..a147c798154 100644
--- a/ydb/core/tx/replication/ydb_proxy/partition_end_watcher_ut.cpp
+++ b/ydb/core/tx/replication/ydb_proxy/partition_end_watcher_ut.cpp
@@ -96,7 +96,7 @@ Y_UNIT_TEST_SUITE(PartitionEndWatcher) {
UNIT_ASSERT_VALUES_EQUAL(actorOps.Events.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(actorOps.Events[0].first, client);
- auto* e = dynamic_cast<TEvYdbProxy::TEvTopicEndPartition*>(actorOps.Events[0].second);
+ auto* e = dynamic_cast<TEvYdbProxy::TEvEndTopicPartition*>(actorOps.Events[0].second);
UNIT_ASSERT(e);
UNIT_ASSERT_VALUES_EQUAL(e->Result.AdjacentPartitionsIds, TVector<ui64>{1});
UNIT_ASSERT_VALUES_EQUAL(e->Result.ChildPartitionsIds, TVector<ui64>{2});
@@ -120,7 +120,7 @@ Y_UNIT_TEST_SUITE(PartitionEndWatcher) {
UNIT_ASSERT_VALUES_EQUAL(actorOps.Events.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(actorOps.Events[0].first, client);
- auto* e = dynamic_cast<TEvYdbProxy::TEvTopicEndPartition*>(actorOps.Events[0].second);
+ auto* e = dynamic_cast<TEvYdbProxy::TEvEndTopicPartition*>(actorOps.Events[0].second);
UNIT_ASSERT(e);
UNIT_ASSERT_VALUES_EQUAL(e->Result.AdjacentPartitionsIds, TVector<ui64>{1});
UNIT_ASSERT_VALUES_EQUAL(e->Result.ChildPartitionsIds, TVector<ui64>{2});
diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
index 87484335ab7..8a32687788a 100644
--- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
+++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
@@ -39,6 +39,12 @@ void TEvYdbProxy::TEndTopicPartitionResult::Out(IOutputStream& out) const {
<< " }";
}
+void TEvYdbProxy::TStartTopicReadingSessionResult::Out(IOutputStream& out) const {
+ out << "{"
+ << " ReadSessionId: " << ReadSessionId
+ << " }";
+}
+
template <typename TDerived>
class TBaseProxyActor: public TActor<TDerived> {
class TRequest;
@@ -205,6 +211,7 @@ class TTopicReader: public TBaseProxyActor<TTopicReader> {
if (auto* x = std::get_if<TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
PartitionEndWatcher.Clear();
x->Confirm();
+ Send(ev->Get()->Sender, new TEvYdbProxy::TEvStartTopicReadingSession(*x), 0, ev->Get()->Cookie);
return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie);
} else if (auto* x = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
x->Confirm();
diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
index f40a595b91b..dd321417057 100644
--- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
+++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
@@ -53,7 +53,8 @@ struct TEvYdbProxy {
EvTopicReaderGone,
EV_REQUEST_RESPONSE(ReadTopic),
EV_REQUEST_RESPONSE(CommitOffset),
- EvTopicEndPartition,
+ EvEndTopicPartition,
+ EvStartTopicReadingSession,
EvEnd,
};
@@ -203,7 +204,22 @@ struct TEvYdbProxy {
TVector<ui64> ChildPartitionsIds;
};
- struct TEvTopicEndPartition: public TGenericResponse<TEvTopicEndPartition, EvTopicEndPartition, TEndTopicPartitionResult> {
+ struct TEvEndTopicPartition: public TGenericResponse<TEvEndTopicPartition, EvEndTopicPartition, TEndTopicPartitionResult> {
+ using TBase::TBase;
+ };
+
+ struct TStartTopicReadingSessionResult {
+ explicit TStartTopicReadingSessionResult(const NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent& event)
+ : ReadSessionId(event.GetPartitionSession()->GetReadSessionId())
+ {
+ }
+
+ void Out(IOutputStream& out) const;
+
+ TString ReadSessionId;
+ };
+
+ struct TEvStartTopicReadingSession: public TGenericResponse<TEvStartTopicReadingSession, EvStartTopicReadingSession, TStartTopicReadingSessionResult> {
using TBase::TBase;
};
@@ -244,7 +260,7 @@ struct TEvYdbProxy {
DEFINE_GENERIC_REQUEST_RESPONSE(DescribeConsumer, NYdb::NTopic::TDescribeConsumerResult, TString, TString, NYdb::NTopic::TDescribeConsumerSettings);
DEFINE_GENERIC_REQUEST_RESPONSE(CreateTopicReader, TActorId, TTopicReaderSettings);
DEFINE_GENERIC_REQUEST_RESPONSE(ReadTopic, TReadTopicResult, TReadTopicSettings);
- DEFINE_GENERIC_REQUEST_RESPONSE(CommitOffset, NYdb::TStatus, TString, ui64, TString, ui64, NYdb::NTopic::TCommitOffsetSettings);
+ DEFINE_GENERIC_REQUEST_RESPONSE(CommitOffset, NYdb::TStatus, std::string, ui64, std::string, ui64, NYdb::NTopic::TCommitOffsetSettings);
#undef DEFINE_GENERIC_REQUEST_RESPONSE
#undef DEFINE_GENERIC_RESPONSE
diff --git a/ydb/tests/functional/replication/utils.h b/ydb/tests/functional/replication/utils.h
index 0fd899288c5..74d18062335 100644
--- a/ydb/tests/functional/replication/utils.h
+++ b/ydb/tests/functional/replication/utils.h
@@ -32,7 +32,7 @@ inline void Out<NYdb::Dev::TUuidValue>(IOutputStream& os, const NYdb::Dev::TUuid
namespace NReplicationTest {
struct IChecker {
- virtual void Assert(const TString& msg, const ::Ydb::Value& value) = 0;
+ virtual void Assert(const std::string& msg, const ::Ydb::Value& value) = 0;
virtual ~IChecker() = default;
};
@@ -43,7 +43,7 @@ struct Checker : public IChecker {
: Expected(std::move(expected))
{}
- void Assert(const TString& msg, const ::Ydb::Value& value) override {
+ void Assert(const std::string& msg, const ::Ydb::Value& value) override {
UNIT_ASSERT_VALUES_EQUAL_C(Get(value), Expected, msg);
}
@@ -98,7 +98,7 @@ inline TUuidValue Checker<TUuidValue>::Get(const ::Ydb::Value& value) {
}
template<typename T>
-std::pair<TString, std::shared_ptr<IChecker>> _C(TString&& name, T&& expected) {
+std::pair<TString, std::shared_ptr<IChecker>> _C(std::string&& name, T&& expected) {
return {
std::move(name),
std::make_shared<Checker<T>>(std::move(expected))
@@ -106,7 +106,7 @@ std::pair<TString, std::shared_ptr<IChecker>> _C(TString&& name, T&& expected) {
}
template<typename C, typename T>
-std::pair<TString, std::shared_ptr<IChecker>> _T(TString&& name, T&& expected) {
+std::pair<TString, std::shared_ptr<IChecker>> _T(std::string&& name, T&& expected) {
return {
std::move(name),
std::make_shared<C>(std::move(expected))
@@ -116,8 +116,8 @@ std::pair<TString, std::shared_ptr<IChecker>> _T(TString&& name, T&& expected) {
struct TMessage {
TString Message;
std::optional<ui32> Partition = std::nullopt;
- std::optional<TString> ProducerId = std::nullopt;
- std::optional<TString> MessageGroupId = std::nullopt;
+ std::optional<std::string> ProducerId = std::nullopt;
+ std::optional<std::string> MessageGroupId = std::nullopt;
std::optional<ui64> SeqNo = std::nullopt;
};
@@ -131,7 +131,7 @@ inline TMessage _withSeqNo(ui64 seqNo) {
};
}
-inline TMessage _withProducerId(const TString& producerId) {
+inline TMessage _withProducerId(const std::string& producerId) {
return {
.Message = TStringBuilder() << "Message-" << producerId,
.Partition = 0,
@@ -141,7 +141,7 @@ inline TMessage _withProducerId(const TString& producerId) {
};
}
-inline TMessage _withMessageGroupId(const TString& messageGroupId) {
+inline TMessage _withMessageGroupId(const std::string& messageGroupId) {
return {
.Message = TStringBuilder() << "Message-" << messageGroupId,
.Partition = 0,
@@ -172,7 +172,7 @@ struct MainTestCase {
, TableName(TStringBuilder() << "Table_" << Id)
, ReplicationName(TStringBuilder() << "Replication_" << Id)
, TransferName(TStringBuilder() << "Transfer_" << Id)
- , Driver(TDriverConfig(ConnectionString))
+ , Driver(TDriverConfig(ConnectionString)/*.SetLog(std::unique_ptr<TLogBackend>(CreateLogBackend("cerr", ELogPriority::TLOG_INFO).Release()))*/)
, TableClient(Driver)
, Session(TableClient.GetSession().GetValueSync().GetSession())
, TopicClient(Driver)
@@ -183,11 +183,11 @@ struct MainTestCase {
Driver.Stop(true);
}
- void ExecuteDDL(const TString& ddl, bool checkResult = true, const TString& expectedMessage = "") {
+ void ExecuteDDL(const std::string& ddl, bool checkResult = true, const std::string& expectedMessage = "") {
Cerr << "DDL: " << ddl << Endl << Flush;
auto res = Session.ExecuteQuery(ddl, TTxControl::NoTx()).GetValueSync();
if (checkResult) {
- if (expectedMessage) {
+ if (!expectedMessage.empty()) {
UNIT_ASSERT(!res.IsSuccess());
Cerr << ">>>>> ACTUAL: " << res.GetIssues().ToOneLineString() << Endl << Flush;
Cerr << ">>>>> EXPECTED: " << expectedMessage << Endl << Flush;
@@ -198,21 +198,33 @@ struct MainTestCase {
}
}
- auto ExecuteSourceTableQuery(const TString& query) {
+ auto ExecuteQuery(const std::string& query, bool retry = true) {
for (size_t i = 10; i--;) {
- auto q = Sprintf(query.data(), SourceTableName.data());
- Cerr << ">>>>> Query: " << q << Endl << Flush;
- auto res = Session.ExecuteQuery(q, TTxControl::NoTx()).GetValueSync();
- if (res.IsSuccess()) {
- return;
+ Cerr << ">>>>> Query: " << query << Endl << Flush;
+ auto res = Session.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
+ if (!res.IsSuccess()) {
+ Cerr << ">>>>> Query error: " << res.GetIssues().ToString() << Endl << Flush;
+ }
+ if (res.IsSuccess() || !retry) {
+ return res;
}
UNIT_ASSERT_C(i, res.GetIssues().ToString());
Sleep(TDuration::Seconds(1));
}
+
+ Y_UNREACHABLE();
}
- void CreateTable(const TString& tableDDL) {
+ auto ExecuteTableQuery(const std::string& query) {
+ return ExecuteQuery(Sprintf(query.data(), TableName.data()));
+ }
+
+ auto ExecuteSourceTableQuery(const std::string& query) {
+ return ExecuteQuery(Sprintf(query.data(), SourceTableName.data()));
+ }
+
+ void CreateTable(const std::string& tableDDL) {
ExecuteDDL(Sprintf(tableDDL.data(), TableName.data()));
}
@@ -220,7 +232,7 @@ struct MainTestCase {
ExecuteDDL(Sprintf("DROP TABLE `%s`", TableName.data()));
}
- void CreateSourceTable(const TString& tableDDL) {
+ void CreateSourceTable(const std::string& tableDDL) {
ExecuteDDL(Sprintf(tableDDL.data(), SourceTableName.data()));
}
@@ -251,13 +263,20 @@ struct MainTestCase {
ExecuteDDL(Sprintf("DROP TOPIC `%s`", TopicName.data()));
}
- void CreateConsumer(const TString& consumerName) {
+ void CreateConsumer(const std::string& consumerName) {
ExecuteDDL(Sprintf(R"(
ALTER TOPIC `%s`
ADD CONSUMER `%s`;
)", TopicName.data(), consumerName.data()));
}
+ void DropConsumer(const std::string& consumerName) {
+ ExecuteDDL(Sprintf(R"(
+ ALTER TOPIC `%s`
+ DROP CONSUMER `%s`;
+ )", TopicName.data(), consumerName.data()));
+ }
+
struct CreateTransferSettings {
std::optional<TString> TopicName = std::nullopt;
std::optional<TString> ConsumerName = std::nullopt;
@@ -266,14 +285,14 @@ struct MainTestCase {
CreateTransferSettings() {};
- static CreateTransferSettings WithTopic(const TString& topicName, std::optional<TString> consumerName = std::nullopt) {
+ static CreateTransferSettings WithTopic(const std::string& topicName, std::optional<TString> consumerName = std::nullopt) {
CreateTransferSettings result;
result.TopicName = topicName;
result.ConsumerName = consumerName;
return result;
}
- static CreateTransferSettings WithConsumerName(const TString& consumerName) {
+ static CreateTransferSettings WithConsumerName(const std::string& consumerName) {
CreateTransferSettings result;
result.ConsumerName = consumerName;
return result;
@@ -287,7 +306,7 @@ struct MainTestCase {
}
};
- void CreateTransfer(const TString& lambda, const CreateTransferSettings& settings = CreateTransferSettings()) {
+ void CreateTransfer(const std::string& lambda, const CreateTransferSettings& settings = CreateTransferSettings()) {
TStringBuilder sb;
if (settings.ConsumerName) {
sb << ", CONSUMER = '" << *settings.ConsumerName << "'" << Endl;
@@ -331,14 +350,14 @@ struct MainTestCase {
return result;
}
- static AlterTransferSettings WithTransformLambda(const TString& lambda) {
+ static AlterTransferSettings WithTransformLambda(const std::string& lambda) {
AlterTransferSettings result;
result.TransformLambda = lambda;
return result;
}
};
- void AlterTransfer(const TString& lambda) {
+ void AlterTransfer(const std::string& lambda) {
AlterTransfer(AlterTransferSettings::WithTransformLambda(lambda));
}
@@ -400,6 +419,30 @@ struct MainTestCase {
return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings).ExtractValueSync();
}
+ auto DescribeConsumer(const std::string& consumerName) {
+ TDescribeConsumerSettings settings;
+ settings.IncludeLocation(true);
+ settings.IncludeStats(true);
+
+ auto c = TopicClient.DescribeConsumer(TopicName, consumerName, settings).GetValueSync();
+ UNIT_ASSERT(c.IsSuccess());
+ return c;
+ }
+
+ auto DescribeConsumer() {
+ auto topic = DescribeTopic();
+ auto consumers = topic.GetTopicDescription().GetConsumers();
+ UNIT_ASSERT_VALUES_EQUAL(1, consumers.size());
+ return DescribeConsumer(consumers[0].GetConsumerName());
+ }
+
+ void CheckCommittedOffset(size_t partitionId, size_t expectedOffset) {
+ auto d = DescribeConsumer();
+ UNIT_ASSERT(d.IsSuccess());
+ auto s = d.GetConsumerDescription().GetPartitions().at(partitionId).GetPartitionConsumerStats();
+ UNIT_ASSERT_VALUES_EQUAL(expectedOffset, s->GetCommittedOffset());
+ }
+
void CreateReplication() {
auto ddl = Sprintf(R"(
CREATE ASYNC REPLICATION `%s`
@@ -457,7 +500,7 @@ struct MainTestCase {
)", ReplicationName.data()));
}
- auto DescribeTopic() {
+ TDescribeTopicResult DescribeTopic() {
TDescribeTopicSettings settings;
settings.IncludeLocation(true);
settings.IncludeStats(true);
@@ -496,11 +539,8 @@ struct MainTestCase {
}
- auto query = Sprintf("SELECT %s FROM `%s` ORDER BY %s", columns.data(), TableName.data(), columns.data());
- Cerr << ">>>>> Query: " << query << Endl << Flush;
- auto res = Session.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
+ auto res = ExecuteQuery(Sprintf("SELECT %s FROM `%s` ORDER BY %s", columns.data(), TableName.data(), columns.data()), false);
if (!res.IsSuccess()) {
- Cerr << ">>>>> Query error: " << res.GetIssues().ToString() << Endl << Flush;
TResultSet r{Ydb::ResultSet()};
return {-1, NYdb::TProtoAccessor::GetProto(r)};
}
@@ -547,7 +587,7 @@ struct MainTestCase {
Y_UNREACHABLE();
}
- void CheckTransferStateError(const TString& expectedMessage) {
+ void CheckTransferStateError(const std::string& expectedMessage) {
auto result = CheckTransferState(TReplicationDescription::EState::Error);
Cerr << ">>>>> ACTUAL: " << result.GetErrorState().GetIssues().ToOneLineString() << Endl << Flush;
Cerr << ">>>>> EXPECTED: " << expectedMessage << Endl << Flush;