diff options
author | Nikolay Shestakov <tesseract@ydb.tech> | 2025-04-24 13:27:27 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-24 13:27:27 +0500 |
commit | 0ddb5fd0f317eb1b835bdcf4d73e4ba05029d8db (patch) | |
tree | fd5af81c849ae176ea939b30867b6f6ef8dc20c8 | |
parent | 305b5cdccc9c2f43a8621aacdf120d1ded1b793c (diff) | |
download | ydb-0ddb5fd0f317eb1b835bdcf4d73e4ba05029d8db.tar.gz |
Fixed committing of proccessed messages by transfer (#17580)
-rw-r--r-- | ydb/core/tx/replication/service/service.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/topic_reader.cpp | 39 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/worker.cpp | 26 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/worker.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/replication/ydb_proxy/partition_end_watcher_ut.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/replication/ydb_proxy/ydb_proxy.h | 22 | ||||
-rw-r--r-- | ydb/tests/functional/replication/utils.h | 102 |
9 files changed, 176 insertions, 41 deletions
diff --git a/ydb/core/tx/replication/service/service.cpp b/ydb/core/tx/replication/service/service.cpp index 2d4a7c93110..64cdef9d96b 100644 --- a/ydb/core/tx/replication/service/service.cpp +++ b/ydb/core/tx/replication/service/service.cpp @@ -377,7 +377,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> { return it->second; } - std::function<IActor*(void)> ReaderFn(const NKikimrReplication::TRemoteTopicReaderSettings& settings) { + std::function<IActor*(void)> ReaderFn(const NKikimrReplication::TRemoteTopicReaderSettings& settings, bool autoCommit) { TActorId ydbProxy; const auto& params = settings.GetConnectionParams(); switch (params.GetCredentialsCase()) { @@ -394,6 +394,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> { auto topicReaderSettings = TEvYdbProxy::TTopicReaderSettings() .MaxMemoryUsageBytes(1_MB) .ConsumerName(settings.GetConsumerName()) + .AutoCommit(autoCommit) .AppendTopics(NYdb::NTopic::TTopicReadSettings() .Path(settings.GetTopicPath()) .AppendPartitionIds(settings.GetTopicPartitionId()) @@ -473,6 +474,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> { const auto& cmd = record.GetCommand(); // TODO: validate settings const auto& readerSettings = cmd.GetRemoteTopicReader(); + bool autoCommit = true; std::function<IActor*(void)> writerFn; if (cmd.HasLocalTableWriter()) { const auto& writerSettings = cmd.GetLocalTableWriter(); @@ -485,12 +487,13 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> { LOG_C("Run transfer but TransferWriterFactory does not exists."); return; } + autoCommit = false; writerFn = TransferWriterFn(writerSettings, transferWriterFactory); } else { Y_ABORT("Unsupported"); } const auto actorId = session.RegisterWorker(this, id, - CreateWorker(SelfId(), ReaderFn(readerSettings), std::move(writerFn))); + CreateWorker(SelfId(), ReaderFn(readerSettings, autoCommit), std::move(writerFn))); WorkerActorIdToSession[actorId] = controller.GetTabletId(); } diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp index e25c3b708f6..d7fde434a96 100644 --- a/ydb/core/tx/replication/service/topic_reader.cpp +++ b/ydb/core/tx/replication/service/topic_reader.cpp @@ -68,13 +68,44 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> { Send(Worker, new TEvWorker::TEvData(result.PartitionId, ToString(result.PartitionId), std::move(records))); } - void Handle(TEvYdbProxy::TEvTopicEndPartition::TPtr& ev) { + void Handle(TEvYdbProxy::TEvEndTopicPartition::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); auto& result = ev->Get()->Result; Send(Worker, new TEvWorker::TEvDataEnd(result.PartitionId, std::move(result.AdjacentPartitionsIds), std::move(result.ChildPartitionsIds))); } + void Handle(TEvYdbProxy::TEvStartTopicReadingSession::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + + ReadSessionId = ev->Get()->Result.ReadSessionId; + } + + void Handle(TEvWorker::TEvCommit::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + + Y_ABORT_UNLESS(YdbProxy); + Y_ABORT_UNLESS(ReadSessionId); + + auto settings = NYdb::NTopic::TCommitOffsetSettings() + .ReadSessionId(ReadSessionId); + + const auto& topicName = Settings.GetBase().Topics_.at(0).Path_; + const auto partitionId = Settings.GetBase().Topics_.at(0).PartitionIds_.at(0); + const auto& consumerName = Settings.GetBase().ConsumerName_; + + Send(YdbProxy, new TEvYdbProxy::TEvCommitOffsetRequest(topicName, partitionId, consumerName, ev->Get()->Offset, std::move(settings))); + } + + void Handle(TEvYdbProxy::TEvCommitOffsetResponse::TPtr& ev) { + if (!ev->Get()->Result.IsSuccess()) { + LOG_W("Handle " << ev->Get()->ToString()); + return Leave(TEvWorker::TEvGone::UNAVAILABLE); + } else { + LOG_D("Handle " << ev->Get()->ToString()); + } + } + void Handle(TEvYdbProxy::TEvTopicReaderGone::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); @@ -121,9 +152,12 @@ public: switch (ev->GetTypeRewrite()) { hFunc(TEvWorker::TEvHandshake, Handle); hFunc(TEvWorker::TEvPoll, Handle); + hFunc(TEvWorker::TEvCommit, Handle); hFunc(TEvYdbProxy::TEvCreateTopicReaderResponse, Handle); hFunc(TEvYdbProxy::TEvReadTopicResponse, Handle); - hFunc(TEvYdbProxy::TEvTopicEndPartition, Handle); + hFunc(TEvYdbProxy::TEvCommitOffsetResponse, Handle); + hFunc(TEvYdbProxy::TEvStartTopicReadingSession, Handle); + hFunc(TEvYdbProxy::TEvEndTopicPartition, Handle); hFunc(TEvYdbProxy::TEvTopicReaderGone, Handle); sFunc(TEvents::TEvPoison, PassAway); } @@ -136,6 +170,7 @@ private: TActorId Worker; TActorId ReadSession; + TString ReadSessionId; }; // TRemoteTopicReader diff --git a/ydb/core/tx/replication/service/worker.cpp b/ydb/core/tx/replication/service/worker.cpp index d66eacf1ce3..de1338b733b 100644 --- a/ydb/core/tx/replication/service/worker.cpp +++ b/ydb/core/tx/replication/service/worker.cpp @@ -25,6 +25,17 @@ TString TEvWorker::TEvPoll::ToString() const { << " }"; } +TEvWorker::TEvCommit::TEvCommit(size_t offset) + : Offset(offset) +{ +} + +TString TEvWorker::TEvCommit::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " Offset: " << Offset + << " }"; +} + TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, const TVector<TTopicMessage>& records) : PartitionId(partitionId) , Source(source) @@ -187,6 +198,20 @@ class TWorker: public TActorBootstrapped<TWorker> { } } + void Handle(TEvWorker::TEvCommit::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + + if (ev->Sender != Writer) { + LOG_W("Commit from unknown actor" + << ": sender# " << ev->Sender); + return; + } + + if (Reader) { + Send(ev->Forward(Reader)); + } + } + void Handle(TEvWorker::TEvData::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); @@ -308,6 +333,7 @@ public: switch (ev->GetTypeRewrite()) { hFunc(TEvWorker::TEvHandshake, Handle); hFunc(TEvWorker::TEvPoll, Handle); + hFunc(TEvWorker::TEvCommit, Handle); hFunc(TEvWorker::TEvData, Handle); hFunc(TEvWorker::TEvDataEnd, Forward); hFunc(TEvWorker::TEvGone, Handle); diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h index 1c1dc14133a..11897c9804c 100644 --- a/ydb/core/tx/replication/service/worker.h +++ b/ydb/core/tx/replication/service/worker.h @@ -24,6 +24,7 @@ struct TEvWorker { EvGone, EvStatus, EvDataEnd, + EvCommit, EvEnd, }; @@ -39,6 +40,13 @@ struct TEvWorker { TString ToString() const override; }; + struct TEvCommit: public TEventLocal<TEvCommit, EvCommit> { + size_t Offset; + + explicit TEvCommit(size_t offset); + TString ToString() const override; + }; + struct TEvData: public TEventLocal<TEvData, EvData> { ui32 PartitionId; TString Source; diff --git a/ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h b/ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h index d051440f477..180165afe9c 100644 --- a/ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h +++ b/ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h @@ -14,7 +14,7 @@ class TPartitionEndWatcher { return; } - ActorOps->Send(client, new TEvYdbProxy::TEvTopicEndPartition(*EndPartitionSessionEvent)); + ActorOps->Send(client, new TEvYdbProxy::TEvEndTopicPartition(*EndPartitionSessionEvent)); } public: diff --git a/ydb/core/tx/replication/ydb_proxy/partition_end_watcher_ut.cpp b/ydb/core/tx/replication/ydb_proxy/partition_end_watcher_ut.cpp index f33fa4716ff..a147c798154 100644 --- a/ydb/core/tx/replication/ydb_proxy/partition_end_watcher_ut.cpp +++ b/ydb/core/tx/replication/ydb_proxy/partition_end_watcher_ut.cpp @@ -96,7 +96,7 @@ Y_UNIT_TEST_SUITE(PartitionEndWatcher) { UNIT_ASSERT_VALUES_EQUAL(actorOps.Events.size(), 1); UNIT_ASSERT_VALUES_EQUAL(actorOps.Events[0].first, client); - auto* e = dynamic_cast<TEvYdbProxy::TEvTopicEndPartition*>(actorOps.Events[0].second); + auto* e = dynamic_cast<TEvYdbProxy::TEvEndTopicPartition*>(actorOps.Events[0].second); UNIT_ASSERT(e); UNIT_ASSERT_VALUES_EQUAL(e->Result.AdjacentPartitionsIds, TVector<ui64>{1}); UNIT_ASSERT_VALUES_EQUAL(e->Result.ChildPartitionsIds, TVector<ui64>{2}); @@ -120,7 +120,7 @@ Y_UNIT_TEST_SUITE(PartitionEndWatcher) { UNIT_ASSERT_VALUES_EQUAL(actorOps.Events.size(), 1); UNIT_ASSERT_VALUES_EQUAL(actorOps.Events[0].first, client); - auto* e = dynamic_cast<TEvYdbProxy::TEvTopicEndPartition*>(actorOps.Events[0].second); + auto* e = dynamic_cast<TEvYdbProxy::TEvEndTopicPartition*>(actorOps.Events[0].second); UNIT_ASSERT(e); UNIT_ASSERT_VALUES_EQUAL(e->Result.AdjacentPartitionsIds, TVector<ui64>{1}); UNIT_ASSERT_VALUES_EQUAL(e->Result.ChildPartitionsIds, TVector<ui64>{2}); diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp index 87484335ab7..8a32687788a 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp @@ -39,6 +39,12 @@ void TEvYdbProxy::TEndTopicPartitionResult::Out(IOutputStream& out) const { << " }"; } +void TEvYdbProxy::TStartTopicReadingSessionResult::Out(IOutputStream& out) const { + out << "{" + << " ReadSessionId: " << ReadSessionId + << " }"; +} + template <typename TDerived> class TBaseProxyActor: public TActor<TDerived> { class TRequest; @@ -205,6 +211,7 @@ class TTopicReader: public TBaseProxyActor<TTopicReader> { if (auto* x = std::get_if<TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) { PartitionEndWatcher.Clear(); x->Confirm(); + Send(ev->Get()->Sender, new TEvYdbProxy::TEvStartTopicReadingSession(*x), 0, ev->Get()->Cookie); return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie); } else if (auto* x = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) { x->Confirm(); diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h index f40a595b91b..dd321417057 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h @@ -53,7 +53,8 @@ struct TEvYdbProxy { EvTopicReaderGone, EV_REQUEST_RESPONSE(ReadTopic), EV_REQUEST_RESPONSE(CommitOffset), - EvTopicEndPartition, + EvEndTopicPartition, + EvStartTopicReadingSession, EvEnd, }; @@ -203,7 +204,22 @@ struct TEvYdbProxy { TVector<ui64> ChildPartitionsIds; }; - struct TEvTopicEndPartition: public TGenericResponse<TEvTopicEndPartition, EvTopicEndPartition, TEndTopicPartitionResult> { + struct TEvEndTopicPartition: public TGenericResponse<TEvEndTopicPartition, EvEndTopicPartition, TEndTopicPartitionResult> { + using TBase::TBase; + }; + + struct TStartTopicReadingSessionResult { + explicit TStartTopicReadingSessionResult(const NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent& event) + : ReadSessionId(event.GetPartitionSession()->GetReadSessionId()) + { + } + + void Out(IOutputStream& out) const; + + TString ReadSessionId; + }; + + struct TEvStartTopicReadingSession: public TGenericResponse<TEvStartTopicReadingSession, EvStartTopicReadingSession, TStartTopicReadingSessionResult> { using TBase::TBase; }; @@ -244,7 +260,7 @@ struct TEvYdbProxy { DEFINE_GENERIC_REQUEST_RESPONSE(DescribeConsumer, NYdb::NTopic::TDescribeConsumerResult, TString, TString, NYdb::NTopic::TDescribeConsumerSettings); DEFINE_GENERIC_REQUEST_RESPONSE(CreateTopicReader, TActorId, TTopicReaderSettings); DEFINE_GENERIC_REQUEST_RESPONSE(ReadTopic, TReadTopicResult, TReadTopicSettings); - DEFINE_GENERIC_REQUEST_RESPONSE(CommitOffset, NYdb::TStatus, TString, ui64, TString, ui64, NYdb::NTopic::TCommitOffsetSettings); + DEFINE_GENERIC_REQUEST_RESPONSE(CommitOffset, NYdb::TStatus, std::string, ui64, std::string, ui64, NYdb::NTopic::TCommitOffsetSettings); #undef DEFINE_GENERIC_REQUEST_RESPONSE #undef DEFINE_GENERIC_RESPONSE diff --git a/ydb/tests/functional/replication/utils.h b/ydb/tests/functional/replication/utils.h index 0fd899288c5..74d18062335 100644 --- a/ydb/tests/functional/replication/utils.h +++ b/ydb/tests/functional/replication/utils.h @@ -32,7 +32,7 @@ inline void Out<NYdb::Dev::TUuidValue>(IOutputStream& os, const NYdb::Dev::TUuid namespace NReplicationTest { struct IChecker { - virtual void Assert(const TString& msg, const ::Ydb::Value& value) = 0; + virtual void Assert(const std::string& msg, const ::Ydb::Value& value) = 0; virtual ~IChecker() = default; }; @@ -43,7 +43,7 @@ struct Checker : public IChecker { : Expected(std::move(expected)) {} - void Assert(const TString& msg, const ::Ydb::Value& value) override { + void Assert(const std::string& msg, const ::Ydb::Value& value) override { UNIT_ASSERT_VALUES_EQUAL_C(Get(value), Expected, msg); } @@ -98,7 +98,7 @@ inline TUuidValue Checker<TUuidValue>::Get(const ::Ydb::Value& value) { } template<typename T> -std::pair<TString, std::shared_ptr<IChecker>> _C(TString&& name, T&& expected) { +std::pair<TString, std::shared_ptr<IChecker>> _C(std::string&& name, T&& expected) { return { std::move(name), std::make_shared<Checker<T>>(std::move(expected)) @@ -106,7 +106,7 @@ std::pair<TString, std::shared_ptr<IChecker>> _C(TString&& name, T&& expected) { } template<typename C, typename T> -std::pair<TString, std::shared_ptr<IChecker>> _T(TString&& name, T&& expected) { +std::pair<TString, std::shared_ptr<IChecker>> _T(std::string&& name, T&& expected) { return { std::move(name), std::make_shared<C>(std::move(expected)) @@ -116,8 +116,8 @@ std::pair<TString, std::shared_ptr<IChecker>> _T(TString&& name, T&& expected) { struct TMessage { TString Message; std::optional<ui32> Partition = std::nullopt; - std::optional<TString> ProducerId = std::nullopt; - std::optional<TString> MessageGroupId = std::nullopt; + std::optional<std::string> ProducerId = std::nullopt; + std::optional<std::string> MessageGroupId = std::nullopt; std::optional<ui64> SeqNo = std::nullopt; }; @@ -131,7 +131,7 @@ inline TMessage _withSeqNo(ui64 seqNo) { }; } -inline TMessage _withProducerId(const TString& producerId) { +inline TMessage _withProducerId(const std::string& producerId) { return { .Message = TStringBuilder() << "Message-" << producerId, .Partition = 0, @@ -141,7 +141,7 @@ inline TMessage _withProducerId(const TString& producerId) { }; } -inline TMessage _withMessageGroupId(const TString& messageGroupId) { +inline TMessage _withMessageGroupId(const std::string& messageGroupId) { return { .Message = TStringBuilder() << "Message-" << messageGroupId, .Partition = 0, @@ -172,7 +172,7 @@ struct MainTestCase { , TableName(TStringBuilder() << "Table_" << Id) , ReplicationName(TStringBuilder() << "Replication_" << Id) , TransferName(TStringBuilder() << "Transfer_" << Id) - , Driver(TDriverConfig(ConnectionString)) + , Driver(TDriverConfig(ConnectionString)/*.SetLog(std::unique_ptr<TLogBackend>(CreateLogBackend("cerr", ELogPriority::TLOG_INFO).Release()))*/) , TableClient(Driver) , Session(TableClient.GetSession().GetValueSync().GetSession()) , TopicClient(Driver) @@ -183,11 +183,11 @@ struct MainTestCase { Driver.Stop(true); } - void ExecuteDDL(const TString& ddl, bool checkResult = true, const TString& expectedMessage = "") { + void ExecuteDDL(const std::string& ddl, bool checkResult = true, const std::string& expectedMessage = "") { Cerr << "DDL: " << ddl << Endl << Flush; auto res = Session.ExecuteQuery(ddl, TTxControl::NoTx()).GetValueSync(); if (checkResult) { - if (expectedMessage) { + if (!expectedMessage.empty()) { UNIT_ASSERT(!res.IsSuccess()); Cerr << ">>>>> ACTUAL: " << res.GetIssues().ToOneLineString() << Endl << Flush; Cerr << ">>>>> EXPECTED: " << expectedMessage << Endl << Flush; @@ -198,21 +198,33 @@ struct MainTestCase { } } - auto ExecuteSourceTableQuery(const TString& query) { + auto ExecuteQuery(const std::string& query, bool retry = true) { for (size_t i = 10; i--;) { - auto q = Sprintf(query.data(), SourceTableName.data()); - Cerr << ">>>>> Query: " << q << Endl << Flush; - auto res = Session.ExecuteQuery(q, TTxControl::NoTx()).GetValueSync(); - if (res.IsSuccess()) { - return; + Cerr << ">>>>> Query: " << query << Endl << Flush; + auto res = Session.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); + if (!res.IsSuccess()) { + Cerr << ">>>>> Query error: " << res.GetIssues().ToString() << Endl << Flush; + } + if (res.IsSuccess() || !retry) { + return res; } UNIT_ASSERT_C(i, res.GetIssues().ToString()); Sleep(TDuration::Seconds(1)); } + + Y_UNREACHABLE(); } - void CreateTable(const TString& tableDDL) { + auto ExecuteTableQuery(const std::string& query) { + return ExecuteQuery(Sprintf(query.data(), TableName.data())); + } + + auto ExecuteSourceTableQuery(const std::string& query) { + return ExecuteQuery(Sprintf(query.data(), SourceTableName.data())); + } + + void CreateTable(const std::string& tableDDL) { ExecuteDDL(Sprintf(tableDDL.data(), TableName.data())); } @@ -220,7 +232,7 @@ struct MainTestCase { ExecuteDDL(Sprintf("DROP TABLE `%s`", TableName.data())); } - void CreateSourceTable(const TString& tableDDL) { + void CreateSourceTable(const std::string& tableDDL) { ExecuteDDL(Sprintf(tableDDL.data(), SourceTableName.data())); } @@ -251,13 +263,20 @@ struct MainTestCase { ExecuteDDL(Sprintf("DROP TOPIC `%s`", TopicName.data())); } - void CreateConsumer(const TString& consumerName) { + void CreateConsumer(const std::string& consumerName) { ExecuteDDL(Sprintf(R"( ALTER TOPIC `%s` ADD CONSUMER `%s`; )", TopicName.data(), consumerName.data())); } + void DropConsumer(const std::string& consumerName) { + ExecuteDDL(Sprintf(R"( + ALTER TOPIC `%s` + DROP CONSUMER `%s`; + )", TopicName.data(), consumerName.data())); + } + struct CreateTransferSettings { std::optional<TString> TopicName = std::nullopt; std::optional<TString> ConsumerName = std::nullopt; @@ -266,14 +285,14 @@ struct MainTestCase { CreateTransferSettings() {}; - static CreateTransferSettings WithTopic(const TString& topicName, std::optional<TString> consumerName = std::nullopt) { + static CreateTransferSettings WithTopic(const std::string& topicName, std::optional<TString> consumerName = std::nullopt) { CreateTransferSettings result; result.TopicName = topicName; result.ConsumerName = consumerName; return result; } - static CreateTransferSettings WithConsumerName(const TString& consumerName) { + static CreateTransferSettings WithConsumerName(const std::string& consumerName) { CreateTransferSettings result; result.ConsumerName = consumerName; return result; @@ -287,7 +306,7 @@ struct MainTestCase { } }; - void CreateTransfer(const TString& lambda, const CreateTransferSettings& settings = CreateTransferSettings()) { + void CreateTransfer(const std::string& lambda, const CreateTransferSettings& settings = CreateTransferSettings()) { TStringBuilder sb; if (settings.ConsumerName) { sb << ", CONSUMER = '" << *settings.ConsumerName << "'" << Endl; @@ -331,14 +350,14 @@ struct MainTestCase { return result; } - static AlterTransferSettings WithTransformLambda(const TString& lambda) { + static AlterTransferSettings WithTransformLambda(const std::string& lambda) { AlterTransferSettings result; result.TransformLambda = lambda; return result; } }; - void AlterTransfer(const TString& lambda) { + void AlterTransfer(const std::string& lambda) { AlterTransfer(AlterTransferSettings::WithTransformLambda(lambda)); } @@ -400,6 +419,30 @@ struct MainTestCase { return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings).ExtractValueSync(); } + auto DescribeConsumer(const std::string& consumerName) { + TDescribeConsumerSettings settings; + settings.IncludeLocation(true); + settings.IncludeStats(true); + + auto c = TopicClient.DescribeConsumer(TopicName, consumerName, settings).GetValueSync(); + UNIT_ASSERT(c.IsSuccess()); + return c; + } + + auto DescribeConsumer() { + auto topic = DescribeTopic(); + auto consumers = topic.GetTopicDescription().GetConsumers(); + UNIT_ASSERT_VALUES_EQUAL(1, consumers.size()); + return DescribeConsumer(consumers[0].GetConsumerName()); + } + + void CheckCommittedOffset(size_t partitionId, size_t expectedOffset) { + auto d = DescribeConsumer(); + UNIT_ASSERT(d.IsSuccess()); + auto s = d.GetConsumerDescription().GetPartitions().at(partitionId).GetPartitionConsumerStats(); + UNIT_ASSERT_VALUES_EQUAL(expectedOffset, s->GetCommittedOffset()); + } + void CreateReplication() { auto ddl = Sprintf(R"( CREATE ASYNC REPLICATION `%s` @@ -457,7 +500,7 @@ struct MainTestCase { )", ReplicationName.data())); } - auto DescribeTopic() { + TDescribeTopicResult DescribeTopic() { TDescribeTopicSettings settings; settings.IncludeLocation(true); settings.IncludeStats(true); @@ -496,11 +539,8 @@ struct MainTestCase { } - auto query = Sprintf("SELECT %s FROM `%s` ORDER BY %s", columns.data(), TableName.data(), columns.data()); - Cerr << ">>>>> Query: " << query << Endl << Flush; - auto res = Session.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); + auto res = ExecuteQuery(Sprintf("SELECT %s FROM `%s` ORDER BY %s", columns.data(), TableName.data(), columns.data()), false); if (!res.IsSuccess()) { - Cerr << ">>>>> Query error: " << res.GetIssues().ToString() << Endl << Flush; TResultSet r{Ydb::ResultSet()}; return {-1, NYdb::TProtoAccessor::GetProto(r)}; } @@ -547,7 +587,7 @@ struct MainTestCase { Y_UNREACHABLE(); } - void CheckTransferStateError(const TString& expectedMessage) { + void CheckTransferStateError(const std::string& expectedMessage) { auto result = CheckTransferState(TReplicationDescription::EState::Error); Cerr << ">>>>> ACTUAL: " << result.GetErrorState().GetIssues().ToOneLineString() << Endl << Flush; Cerr << ">>>>> EXPECTED: " << expectedMessage << Endl << Flush; |