aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlek5andr-Kotov <akotov@ydb.tech>2024-10-16 15:28:51 +0300
committerGitHub <noreply@github.com>2024-10-16 15:28:51 +0300
commitb2bc73df8676a727babfe3c63bf9e357a48ad2d3 (patch)
tree1ba04255fcfc6efac6e3d75f241a5f959b14c758
parent30ebe5357bb143648c6be4d151ecd4944af81ada (diff)
downloadydb-b2bc73df8676a727babfe3c63bf9e357a48ad2d3.tar.gz
Commit offsets in a transaction and out of a transaction (#10460)
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp11
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/include/read_events.h5
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp39
4 files changed, 57 insertions, 2 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
index 3f75d24139..3916bb0818 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
@@ -11,6 +11,13 @@ namespace NYdb::NTopic {
static const TString DRIVER_IS_STOPPING_DESCRIPTION = "Driver is stopping";
+void SetReadInTransaction(TReadSessionEvent::TEvent& event)
+{
+ if (auto* e = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&event)) {
+ e->SetReadInTransaction();
+ }
+}
+
TReadSession::TReadSession(const TReadSessionSettings& settings,
std::shared_ptr<TTopicClient::TImpl> client,
std::shared_ptr<TGRpcConnectionsImpl> connections,
@@ -137,6 +144,9 @@ TVector<TReadSessionEvent::TEvent> TReadSession::GetEvents(const TReadSessionGet
if (!events.empty() && settings.Tx_) {
auto& tx = settings.Tx_->get();
CbContext->TryGet()->CollectOffsets(tx, events, Client);
+ for (auto& event : events) {
+ SetReadInTransaction(event);
+ }
}
return events;
}
@@ -155,6 +165,7 @@ TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(const TReadSessionGetEv
if (event && settings.Tx_) {
auto& tx = settings.Tx_->get();
CbContext->TryGet()->CollectOffsets(tx, *event, Client);
+ SetReadInTransaction(*event);
}
return event;
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp
index 7d15ee10f9..6737c517f5 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp
@@ -248,6 +248,10 @@ TDataReceivedEvent::TDataReceivedEvent(TVector<TMessage> messages, TVector<TComp
}
void TDataReceivedEvent::Commit() {
+ if (ReadInTransaction) {
+ ythrow yexception() << "Offsets cannot be commited explicitly when reading in a transaction";
+ }
+
for (auto [from, to] : OffsetRanges) {
static_cast<TPartitionStreamImpl<false>*>(PartitionSession.Get())->Commit(from, to);
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/include/read_events.h b/ydb/public/sdk/cpp/client/ydb_topic/include/read_events.h
index 62b5a22c0c..504b5c23aa 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/include/read_events.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/include/read_events.h
@@ -201,6 +201,10 @@ struct TReadSessionEvent {
return CompressedMessages;
}
+ void SetReadInTransaction() {
+ ReadInTransaction = true;
+ }
+
//! Commits all messages in batch.
void Commit();
@@ -219,6 +223,7 @@ struct TReadSessionEvent {
TVector<TMessage> Messages;
TVector<TCompressedMessage> CompressedMessages;
std::vector<std::pair<ui64, ui64>> OffsetRanges;
+ bool ReadInTransaction = false;
};
//! Acknowledgement for commit request.
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
index 4e55d34527..9ef3febe1e 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
@@ -61,7 +61,14 @@ protected:
void StartPartitionSession(TTopicReadSessionPtr reader, NTable::TTransaction& tx, ui64 offset);
void StartPartitionSession(TTopicReadSessionPtr reader, ui64 offset);
+ struct TReadMessageSettings {
+ NTable::TTransaction& Tx;
+ bool CommitOffsets = false;
+ TMaybe<ui64> Offset;
+ };
+
void ReadMessage(TTopicReadSessionPtr reader, NTable::TTransaction& tx, ui64 offset);
+ void ReadMessage(TTopicReadSessionPtr reader, const TReadMessageSettings& settings);
void WriteMessage(const TString& message);
void WriteMessages(const TVector<TString>& messages,
@@ -305,8 +312,23 @@ void TFixture::StartPartitionSession(TTopicReadSessionPtr reader, ui64 offset)
void TFixture::ReadMessage(TTopicReadSessionPtr reader, NTable::TTransaction& tx, ui64 offset)
{
- auto event = ReadEvent<NTopic::TReadSessionEvent::TDataReceivedEvent>(reader, tx);
- UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[0].GetOffset(), offset);
+ TReadMessageSettings settings {
+ .Tx = tx,
+ .CommitOffsets = false,
+ .Offset = offset
+ };
+ ReadMessage(reader, settings);
+}
+
+void TFixture::ReadMessage(TTopicReadSessionPtr reader, const TReadMessageSettings& settings)
+{
+ auto event = ReadEvent<NTopic::TReadSessionEvent::TDataReceivedEvent>(reader, settings.Tx);
+ if (settings.Offset.Defined()) {
+ UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[0].GetOffset(), *settings.Offset);
+ }
+ if (settings.CommitOffsets) {
+ event.Commit();
+ }
}
template<class E>
@@ -506,6 +528,19 @@ Y_UNIT_TEST_F(TwoSessionOneConsumer, TFixture)
CommitTx(tx1, EStatus::ABORTED);
}
+Y_UNIT_TEST_F(Offsets_Cannot_Be_Promoted_When_Reading_In_A_Transaction, TFixture)
+{
+ WriteMessage("message");
+
+ auto session = CreateTableSession();
+ auto tx = BeginTx(session);
+
+ auto reader = CreateReader();
+ StartPartitionSession(reader, tx, 0);
+
+ UNIT_ASSERT_EXCEPTION(ReadMessage(reader, {.Tx = tx, .CommitOffsets = true}), yexception);
+}
+
//Y_UNIT_TEST_F(WriteToTopic_Invalid_Session, TFixture)
//{
// WriteToTopicWithInvalidTxId(false);