diff options
author | Alek5andr-Kotov <akotov@ydb.tech> | 2024-10-16 15:28:51 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-16 15:28:51 +0300 |
commit | b2bc73df8676a727babfe3c63bf9e357a48ad2d3 (patch) | |
tree | 1ba04255fcfc6efac6e3d75f241a5f959b14c758 | |
parent | 30ebe5357bb143648c6be4d151ecd4944af81ada (diff) | |
download | ydb-b2bc73df8676a727babfe3c63bf9e357a48ad2d3.tar.gz |
Commit offsets in a transaction and out of a transaction (#10460)
4 files changed, 57 insertions, 2 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp index 3f75d24139..3916bb0818 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp @@ -11,6 +11,13 @@ namespace NYdb::NTopic { static const TString DRIVER_IS_STOPPING_DESCRIPTION = "Driver is stopping"; +void SetReadInTransaction(TReadSessionEvent::TEvent& event) +{ + if (auto* e = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&event)) { + e->SetReadInTransaction(); + } +} + TReadSession::TReadSession(const TReadSessionSettings& settings, std::shared_ptr<TTopicClient::TImpl> client, std::shared_ptr<TGRpcConnectionsImpl> connections, @@ -137,6 +144,9 @@ TVector<TReadSessionEvent::TEvent> TReadSession::GetEvents(const TReadSessionGet if (!events.empty() && settings.Tx_) { auto& tx = settings.Tx_->get(); CbContext->TryGet()->CollectOffsets(tx, events, Client); + for (auto& event : events) { + SetReadInTransaction(event); + } } return events; } @@ -155,6 +165,7 @@ TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(const TReadSessionGetEv if (event && settings.Tx_) { auto& tx = settings.Tx_->get(); CbContext->TryGet()->CollectOffsets(tx, *event, Client); + SetReadInTransaction(*event); } return event; } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp index 7d15ee10f9..6737c517f5 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp @@ -248,6 +248,10 @@ TDataReceivedEvent::TDataReceivedEvent(TVector<TMessage> messages, TVector<TComp } void TDataReceivedEvent::Commit() { + if (ReadInTransaction) { + ythrow yexception() << "Offsets cannot be commited explicitly when reading in a transaction"; + } + for (auto [from, to] : OffsetRanges) { static_cast<TPartitionStreamImpl<false>*>(PartitionSession.Get())->Commit(from, to); } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/include/read_events.h b/ydb/public/sdk/cpp/client/ydb_topic/include/read_events.h index 62b5a22c0c..504b5c23aa 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/include/read_events.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/include/read_events.h @@ -201,6 +201,10 @@ struct TReadSessionEvent { return CompressedMessages; } + void SetReadInTransaction() { + ReadInTransaction = true; + } + //! Commits all messages in batch. void Commit(); @@ -219,6 +223,7 @@ struct TReadSessionEvent { TVector<TMessage> Messages; TVector<TCompressedMessage> CompressedMessages; std::vector<std::pair<ui64, ui64>> OffsetRanges; + bool ReadInTransaction = false; }; //! Acknowledgement for commit request. diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index 4e55d34527..9ef3febe1e 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -61,7 +61,14 @@ protected: void StartPartitionSession(TTopicReadSessionPtr reader, NTable::TTransaction& tx, ui64 offset); void StartPartitionSession(TTopicReadSessionPtr reader, ui64 offset); + struct TReadMessageSettings { + NTable::TTransaction& Tx; + bool CommitOffsets = false; + TMaybe<ui64> Offset; + }; + void ReadMessage(TTopicReadSessionPtr reader, NTable::TTransaction& tx, ui64 offset); + void ReadMessage(TTopicReadSessionPtr reader, const TReadMessageSettings& settings); void WriteMessage(const TString& message); void WriteMessages(const TVector<TString>& messages, @@ -305,8 +312,23 @@ void TFixture::StartPartitionSession(TTopicReadSessionPtr reader, ui64 offset) void TFixture::ReadMessage(TTopicReadSessionPtr reader, NTable::TTransaction& tx, ui64 offset) { - auto event = ReadEvent<NTopic::TReadSessionEvent::TDataReceivedEvent>(reader, tx); - UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[0].GetOffset(), offset); + TReadMessageSettings settings { + .Tx = tx, + .CommitOffsets = false, + .Offset = offset + }; + ReadMessage(reader, settings); +} + +void TFixture::ReadMessage(TTopicReadSessionPtr reader, const TReadMessageSettings& settings) +{ + auto event = ReadEvent<NTopic::TReadSessionEvent::TDataReceivedEvent>(reader, settings.Tx); + if (settings.Offset.Defined()) { + UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[0].GetOffset(), *settings.Offset); + } + if (settings.CommitOffsets) { + event.Commit(); + } } template<class E> @@ -506,6 +528,19 @@ Y_UNIT_TEST_F(TwoSessionOneConsumer, TFixture) CommitTx(tx1, EStatus::ABORTED); } +Y_UNIT_TEST_F(Offsets_Cannot_Be_Promoted_When_Reading_In_A_Transaction, TFixture) +{ + WriteMessage("message"); + + auto session = CreateTableSession(); + auto tx = BeginTx(session); + + auto reader = CreateReader(); + StartPartitionSession(reader, tx, 0); + + UNIT_ASSERT_EXCEPTION(ReadMessage(reader, {.Tx = tx, .CommitOffsets = true}), yexception); +} + //Y_UNIT_TEST_F(WriteToTopic_Invalid_Session, TFixture) //{ // WriteToTopicWithInvalidTxId(false); |