diff options
author | abcdef <akotov@ydb.tech> | 2023-07-17 11:13:01 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-07-17 11:13:01 +0300 |
commit | 9cdf0064f0438ac4c9eb64bcfc707101df43c7ca (patch) | |
tree | a2527bc1fdadc7742085218598d00542509e1105 | |
parent | 7dfa215b0e36a9e0e1558162543aaa81123f9ec0 (diff) | |
download | ydb-9cdf0064f0438ac4c9eb64bcfc707101df43c7ca.tar.gz |
new GetEvents and GetEvent methods
новые методы GetEvents и GetEvent
15 files changed, 538 insertions, 2 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.darwin-x86_64.txt index efe49cc4b5..afd2882e39 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.darwin-x86_64.txt @@ -29,6 +29,7 @@ target_link_libraries(cpp-client-ydb_topic PUBLIC api-grpc api-grpc-draft api-protos + cpp-client-ydb_table ) target_sources(cpp-client-ydb_topic PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp diff --git a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-aarch64.txt index 3de6140a52..e62148bcc9 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-aarch64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-aarch64.txt @@ -30,6 +30,7 @@ target_link_libraries(cpp-client-ydb_topic PUBLIC api-grpc api-grpc-draft api-protos + cpp-client-ydb_table ) target_sources(cpp-client-ydb_topic PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp diff --git a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-x86_64.txt index 3de6140a52..e62148bcc9 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-x86_64.txt @@ -30,6 +30,7 @@ target_link_libraries(cpp-client-ydb_topic PUBLIC api-grpc api-grpc-draft api-protos + cpp-client-ydb_table ) target_sources(cpp-client-ydb_topic PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp diff --git a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.windows-x86_64.txt index efe49cc4b5..afd2882e39 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.windows-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.windows-x86_64.txt @@ -29,6 +29,7 @@ target_link_libraries(cpp-client-ydb_topic PUBLIC api-grpc api-grpc-draft api-protos + cpp-client-ydb_table ) target_sources(cpp-client-ydb_topic PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp index 8868748dcc..00520152e0 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp @@ -125,6 +125,21 @@ TVector<TReadSessionEvent::TEvent> TReadSession::GetEvents(bool block, TMaybe<si return res; } +TVector<TReadSessionEvent::TEvent> TReadSession::GetEvents(const TReadSessionGetEventSettings& settings) +{ + auto events = GetEvents(settings.Block_, settings.MaxEventsCount_, settings.MaxByteSize_); + if (!events.empty() && settings.Tx_) { + auto& tx = settings.Tx_->get(); + for (auto& event : events) { + if (auto* dataEvent = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&event)) { + CollectOffsets(tx, *dataEvent); + } + } + UpdateOffsets(tx); + } + return events; +} + TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(bool block, size_t maxByteSize) { auto res = EventsQueue->GetEvent(block, maxByteSize); if (EventsQueue->IsClosed()) { @@ -133,6 +148,95 @@ TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(bool block, size_t maxB return res; } +TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(const TReadSessionGetEventSettings& settings) +{ + auto event = GetEvent(settings.Block_, settings.MaxByteSize_); + if (event) { + auto& tx = settings.Tx_->get(); + if (auto* dataEvent = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*event)) { + CollectOffsets(tx, *dataEvent); + } + UpdateOffsets(tx); + } + return event; +} + +void TReadSession::CollectOffsets(NTable::TTransaction& tx, + const TReadSessionEvent::TDataReceivedEvent& event) +{ + const auto& session = *event.GetPartitionSession(); + + if (event.HasCompressedMessages()) { + for (auto& message : event.GetCompressedMessages()) { + CollectOffsets(tx, session.GetTopicPath(), session.GetPartitionId(), message.GetOffset()); + } + } else { + for (auto& message : event.GetMessages()) { + CollectOffsets(tx, session.GetTopicPath(), session.GetPartitionId(), message.GetOffset()); + } + } +} + +void TReadSession::CollectOffsets(NTable::TTransaction& tx, + const TString& topicPath, ui32 partitionId, ui64 offset) +{ + const TString& sessionId = tx.GetSession().GetId(); + const TString& txId = tx.GetId(); + TOffsetRanges& ranges = OffsetRanges[std::make_pair(sessionId, txId)]; + ranges[topicPath][partitionId].InsertInterval(offset, offset + 1); +} + +void TReadSession::UpdateOffsets(const NTable::TTransaction& tx) +{ + const TString& sessionId = tx.GetSession().GetId(); + const TString& txId = tx.GetId(); + + auto p = OffsetRanges.find(std::make_pair(sessionId, txId)); + if (p == OffsetRanges.end()) { + return; + } + + TVector<TTopicOffsets> topics; + for (auto& [path, partitions] : p->second) { + TTopicOffsets topic; + topic.Path = path; + + topics.push_back(std::move(topic)); + + for (auto& [id, ranges] : partitions) { + TPartitionOffsets partition; + partition.PartitionId = id; + + TTopicOffsets& t = topics.back(); + t.Partitions.push_back(std::move(partition)); + + for (auto& range : ranges) { + TPartitionOffsets& p = t.Partitions.back(); + + TOffsetsRange r; + r.Start = range.first; + r.End = range.second; + + p.Offsets.push_back(r); + } + } + } + + Y_VERIFY(!topics.empty()); + + auto result = Client->UpdateOffsetsInTransaction(tx, + topics, + Settings.ConsumerName_, + {}).GetValueSync(); + Y_VERIFY(!result.IsTransportError()); + + if (!result.IsSuccess()) { + ythrow yexception() << "error on update offsets: " << result; + } + + OffsetRanges.erase(std::make_pair(sessionId, txId)); +} + bool TReadSession::Close(TDuration timeout) { LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout); // Log final counters. diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h index e49bc9bdd6..064a395a69 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h @@ -31,6 +31,12 @@ public: return {}; } + TVector<TReadSessionEvent::TEvent> GetEvents(const TReadSessionGetEventSettings& settings) override { + return GetEvents(settings.Block_, + settings.MaxEventsCount_, + settings.MaxByteSize_); + } + inline TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block, size_t maxByteSize) override { Y_VERIFY(false); @@ -39,6 +45,11 @@ public: return {}; } + TMaybe<TReadSessionEvent::TEvent> GetEvent(const TReadSessionGetEventSettings& settings) override { + return GetEvent(settings.Block_, + settings.MaxByteSize_); + } + inline bool Close(TDuration timeout) override { Y_VERIFY(false); @@ -72,8 +83,13 @@ public: void Start(); NThreading::TFuture<void> WaitEvent() override; - TVector<TReadSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) override; - TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block, size_t maxByteSize) override; + TVector<TReadSessionEvent::TEvent> GetEvents(bool block, + TMaybe<size_t> maxEventsCount, + size_t maxByteSize) override; + TVector<TReadSessionEvent::TEvent> GetEvents(const TReadSessionGetEventSettings& settings) override; + TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block, + size_t maxByteSize) override; + TMaybe<TReadSessionEvent::TEvent> GetEvent(const TReadSessionGetEventSettings& settings) override; bool Close(TDuration timeout) override; @@ -113,6 +129,19 @@ private: void AbortImpl(EStatus statusCode, const TString& message, NPersQueue::TDeferredActions<false>& deferred); private: + using TOffsetRanges = THashMap<TString, THashMap<ui64, TDisjointIntervalTree<ui64>>>; + + void CollectOffsets(NTable::TTransaction& tx, + const TReadSessionEvent::TDataReceivedEvent& event); + void CollectOffsets(NTable::TTransaction& tx, + const TString& topicPath, ui32 partitionId, ui64 offset); + void UpdateOffsets(const NTable::TTransaction& tx); + + // + // (session, tx) -> topic -> partition -> (begin, end) + // + THashMap<std::pair<TString, TString>, TOffsetRanges> OffsetRanges; + TReadSessionSettings Settings; const TString SessionId; const TInstant StartSessionTime = TInstant::Now(); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h index d893dc4fe2..5c42758df9 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h @@ -14,6 +14,25 @@ namespace NYdb::NTopic { +struct TOffsetsRange { + ui64 Start; + ui64 End; +}; + +struct TPartitionOffsets { + ui64 PartitionId; + TVector<TOffsetsRange> Offsets; +}; + +struct TTopicOffsets { + TString Path; + TVector<TPartitionOffsets> Partitions; +}; + +struct TUpdateOffsetsInTransactionSettings : public TOperationRequestSettings<TUpdateOffsetsInTransactionSettings> { + using TOperationRequestSettings<TUpdateOffsetsInTransactionSettings>::TOperationRequestSettings; +}; + class TTopicClient::TImpl : public TClientImplCommon<TTopicClient::TImpl> { public: // Constructor for main client. @@ -246,6 +265,41 @@ public: TRpcRequestSettings::Make(settings)); } + TAsyncStatus UpdateOffsetsInTransaction(const NTable::TTransaction& tx, + const TVector<TTopicOffsets>& topics, + const TString& consumerName, + const TUpdateOffsetsInTransactionSettings& settings) + { + auto request = MakeOperationRequest<Ydb::Topic::UpdateOffsetsInTransactionRequest>(settings); + + request.mutable_tx()->set_id(tx.GetId()); + request.mutable_tx()->set_session(tx.GetSession().GetId()); + + for (auto& t : topics) { + auto* topic = request.mutable_topics()->Add(); + topic->set_path(t.Path); + + for (auto& p : t.Partitions) { + auto* partition = topic->mutable_partitions()->Add(); + partition->set_partition_id(p.PartitionId); + + for (auto& r : p.Offsets) { + auto *range = partition->mutable_partition_offsets()->Add(); + range->set_start(r.Start); + range->set_end(r.End); + } + } + } + + request.set_consumer(consumerName); + + return RunSimple<Ydb::Topic::V1::TopicService, Ydb::Topic::UpdateOffsetsInTransactionRequest, Ydb::Topic::UpdateOffsetsInTransactionResponse>( + std::move(request), + &Ydb::Topic::V1::TopicService::Stub::AsyncUpdateOffsetsInTransaction, + TRpcRequestSettings::Make(settings) + ); + } + // Runtime API. std::shared_ptr<IReadSession> CreateReadSession(const TReadSessionSettings& settings); std::shared_ptr<ISimpleBlockingWriteSession> CreateSimpleWriteSession(const TWriteSessionSettings& settings); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index 751ca9d7cf..6e9874cd6d 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -24,6 +24,10 @@ namespace NYdb { namespace NScheme { struct TPermissions; } + + namespace NTable { + class TTransaction; + } } namespace NYdb::NTopic { @@ -1535,6 +1539,15 @@ public: virtual ~IWriteSession() = default; }; +struct TReadSessionGetEventSettings : public TCommonClientSettingsBase<TReadSessionGetEventSettings> { + using TSelf = TReadSessionGetEventSettings; + + FLUENT_SETTING_DEFAULT(bool, Block, false); + FLUENT_SETTING_OPTIONAL(size_t, MaxEventsCount); + FLUENT_SETTING_DEFAULT(size_t, MaxByteSize, std::numeric_limits<size_t>::max()); + FLUENT_SETTING_OPTIONAL(std::reference_wrapper<NTable::TTransaction>, Tx); +}; + class IReadSession { public: //! Main reader loop. @@ -1554,10 +1567,14 @@ public: virtual TVector<TReadSessionEvent::TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing(), size_t maxByteSize = std::numeric_limits<size_t>::max()) = 0; + virtual TVector<TReadSessionEvent::TEvent> GetEvents(const TReadSessionGetEventSettings& settings) = 0; + //! Get single event. virtual TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block = false, size_t maxByteSize = std::numeric_limits<size_t>::max()) = 0; + virtual TMaybe<TReadSessionEvent::TEvent> GetEvent(const TReadSessionGetEventSettings& settings) = 0; + //! Close read session. //! Waits for all commit acknowledgments to arrive. //! Force close after timeout. diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt index 7d8e23430c..d257f7bc7f 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt @@ -42,6 +42,7 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp ) set_property( TARGET diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt index 614f64ec48..2f974cb1f0 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt @@ -45,6 +45,7 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp ) set_property( TARGET diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt index eebe4771ce..ac1d1c13bf 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt @@ -46,6 +46,7 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp ) set_property( TARGET diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt index 701581fd9b..0105455a74 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt @@ -35,6 +35,7 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp ) set_property( TARGET diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp new file mode 100644 index 0000000000..ce9c3f88ef --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -0,0 +1,322 @@ +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h> + +#include <library/cpp/logger/stream.h> + +#include <library/cpp/testing/unittest/registar.h> + +namespace NYdb::NTopic::NTests { + +Y_UNIT_TEST_SUITE(TxUsage) { + +NKikimr::Tests::TServerSettings MakeServerSettings() +{ + auto settings = PQSettings(0); + settings.SetDomainName("Root"); + settings.SetEnableTopicServiceTx(true); + settings.PQConfig.SetTopicsAreFirstClassCitizen(true); + settings.PQConfig.SetRoot("/Root"); + settings.PQConfig.SetDatabase("/Root"); + return settings; +} + +class TEnvironment { +public: + TEnvironment(); + + void CreateTopic(const TString& path, const TString& consumer); + + TString GetEndpoint() const; + TString GetTopicPath(const TString& name) const; + TString GetTopicParent() const; + TString GetDatabase() const; + + const TDriver& GetDriver(); + +private: + TString Database; + ::NPersQueue::TTestServer Server; + TMaybe<TDriver> Driver; +}; + +TEnvironment::TEnvironment() : + Database("/Root"), + Server(MakeServerSettings(), false) +{ + Server.StartServer(true, GetDatabase()); +} + +void TEnvironment::CreateTopic(const TString& path, const TString& consumer) +{ + NTopic::TTopicClient client(GetDriver()); + + NTopic::TCreateTopicSettings topics; + NTopic::TConsumerSettings<NTopic::TCreateTopicSettings> consumers(topics, consumer); + topics.AppendConsumers(consumers); + + auto status = client.CreateTopic(path, topics).GetValueSync(); + UNIT_ASSERT(status.IsSuccess()); +} + +TString TEnvironment::GetEndpoint() const +{ + return "localhost:" + ToString(Server.GrpcPort); +} + +TString TEnvironment::GetTopicPath(const TString& name) const +{ + return GetTopicParent() + "/" + name; +} + +TString TEnvironment::GetTopicParent() const +{ + return GetDatabase(); +} + +TString TEnvironment::GetDatabase() const +{ + return Database; +} + +const TDriver& TEnvironment::GetDriver() +{ + if (!Driver) { + TDriverConfig config; + config.SetEndpoint(GetEndpoint()); + config.SetDatabase(GetDatabase()); + config.SetAuthToken("root@builtin"); + config.SetLog(MakeHolder<TStreamLogBackend>(&Cerr)); + + Driver.ConstructInPlace(config); + } + + return *Driver; +} + +class TFixture : public NUnitTest::TBaseFixture { +protected: + void SetUp(NUnitTest::TTestContext&) override; + + NTable::TSession CreateSession(); + NTable::TTransaction BeginTx(NTable::TSession& session); + void CommitTx(NTable::TTransaction& tx, EStatus status); + + using TTopicReadSession = NTopic::IReadSession; + using TTopicReadSessionPtr = std::shared_ptr<TTopicReadSession>; + + TTopicReadSessionPtr CreateReader(); + + void StartPartitionSession(TTopicReadSessionPtr reader, NTable::TTransaction& tx, ui64 offset); + void StartPartitionSession(TTopicReadSessionPtr reader, ui64 offset); + + void ReadMessage(TTopicReadSessionPtr reader, NTable::TTransaction& tx, ui64 offset); + + void WriteMessage(const TString& data); + +private: + TString GetTopicPath() const; + TString GetTopicName() const; + TString GetConsumerName() const; + TString GetMessageGroupId() const; + + template<class E> + E ReadEvent(TTopicReadSessionPtr reader, NTable::TTransaction& tx); + template<class E> + E ReadEvent(TTopicReadSessionPtr reader); + + const TDriver& GetDriver(); + + std::shared_ptr<TEnvironment> Env; + TMaybe<TDriver> Driver; +}; + +void TFixture::SetUp(NUnitTest::TTestContext&) +{ + Env = std::make_shared<TEnvironment>(); + + Env->CreateTopic(GetTopicPath(), GetConsumerName()); +} + +NTable::TSession TFixture::CreateSession() +{ + NTable::TTableClient client(GetDriver()); + auto result = client.CreateSession().ExtractValueSync(); + return result.GetSession(); +} + +NTable::TTransaction TFixture::BeginTx(NTable::TSession& session) +{ + auto result = session.BeginTransaction().ExtractValueSync(); + return result.GetTransaction(); +} + +void TFixture::CommitTx(NTable::TTransaction& tx, EStatus status) +{ + auto result = tx.Commit().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), status); +} + +auto TFixture::CreateReader() -> TTopicReadSessionPtr +{ + NTopic::TTopicClient client(GetDriver()); + TReadSessionSettings options; + options.ConsumerName(GetConsumerName()); + options.AppendTopics(GetTopicPath()); + return client.CreateReadSession(options); +} + +void TFixture::StartPartitionSession(TTopicReadSessionPtr reader, NTable::TTransaction& tx, ui64 offset) +{ + auto event = ReadEvent<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(reader, tx); + UNIT_ASSERT_VALUES_EQUAL(event.GetCommittedOffset(), offset); + event.Confirm(); +} + +void TFixture::StartPartitionSession(TTopicReadSessionPtr reader, ui64 offset) +{ + auto event = ReadEvent<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(reader); + UNIT_ASSERT_VALUES_EQUAL(event.GetCommittedOffset(), offset); + event.Confirm(); +} + +void TFixture::ReadMessage(TTopicReadSessionPtr reader, NTable::TTransaction& tx, ui64 offset) +{ + auto event = ReadEvent<NTopic::TReadSessionEvent::TDataReceivedEvent>(reader, tx); + UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[0].GetOffset(), offset); +} + +template<class E> +E TFixture::ReadEvent(TTopicReadSessionPtr reader, NTable::TTransaction& tx) +{ + NTopic::TReadSessionGetEventSettings options; + options.Block(true); + options.MaxEventsCount(1); + options.Tx(tx); + + auto event = reader->GetEvent(options); + UNIT_ASSERT(event); + + auto ev = std::get_if<E>(&*event); + UNIT_ASSERT(ev); + + return *ev; +} + +template<class E> +E TFixture::ReadEvent(TTopicReadSessionPtr reader) +{ + auto event = reader->GetEvent(true, 1); + UNIT_ASSERT(event); + + auto ev = std::get_if<E>(&*event); + UNIT_ASSERT(ev); + + return *ev; +} + +void TFixture::WriteMessage(const TString& data) +{ + NTopic::TWriteSessionSettings options; + options.Path(GetTopicPath()); + options.MessageGroupId(GetMessageGroupId()); + + NTopic::TTopicClient client(GetDriver()); + auto session = client.CreateSimpleBlockingWriteSession(options); + UNIT_ASSERT(session->Write(data)); + session->Close(); +} + +TString TFixture::GetTopicPath() const +{ + return Env->GetTopicPath(GetTopicName()); +} + +TString TFixture::GetTopicName() const +{ + return "my-topic"; +} + +TString TFixture::GetConsumerName() const +{ + return "my-consumer"; +} + +TString TFixture::GetMessageGroupId() const +{ + return "my-message-group"; +} + +const TDriver& TFixture::GetDriver() +{ + return Env->GetDriver(); +} + +Y_UNIT_TEST_F(SessionAbort, TFixture) +{ + { + auto reader = CreateReader(); + auto session = CreateSession(); + auto tx = BeginTx(session); + + StartPartitionSession(reader, tx, 0); + + WriteMessage("message #0"); + ReadMessage(reader, tx, 0); + + WriteMessage("message #1"); + ReadMessage(reader, tx, 1); + } + + { + auto session = CreateSession(); + auto tx = BeginTx(session); + auto reader = CreateReader(); + + StartPartitionSession(reader, tx, 0); + + ReadMessage(reader, tx, 0); + + CommitTx(tx, EStatus::SUCCESS); + } + + { + auto reader = CreateReader(); + + StartPartitionSession(reader, 2); + } +} + +Y_UNIT_TEST_F(TwoSessionOneConsumer, TFixture) +{ + WriteMessage("message #0"); + + auto session1 = CreateSession(); + auto tx1 = BeginTx(session1); + + { + auto reader = CreateReader(); + + StartPartitionSession(reader, tx1, 0); + + ReadMessage(reader, tx1, 0); + } + + auto session2 = CreateSession(); + auto tx2 = BeginTx(session2); + + { + auto reader = CreateReader(); + + StartPartitionSession(reader, tx2, 0); + + ReadMessage(reader, tx2, 0); + } + + CommitTx(tx2, EStatus::SUCCESS); + CommitTx(tx1, EStatus::ABORTED); +} + +} + +} diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make b/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make index 98728614f6..b1f2718022 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make @@ -33,6 +33,7 @@ SRCS( describe_topic_ut.cpp managed_executor.h managed_executor.cpp + topic_to_table_ut.cpp ) END() diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ya.make b/ydb/public/sdk/cpp/client/ydb_topic/ya.make index 2c096421b7..190f7ab041 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ya.make +++ b/ydb/public/sdk/cpp/client/ydb_topic/ya.make @@ -17,6 +17,7 @@ PEERDIR( ydb/public/api/grpc ydb/public/api/grpc/draft ydb/public/api/protos + ydb/public/sdk/cpp/client/ydb_table ) END() |