aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlek5andr-Kotov <akotov@ydb.tech>2024-09-26 13:11:56 +0300
committerGitHub <noreply@github.com>2024-09-26 10:11:56 +0000
commitcc0a3366ece6c8245f14660caad22780ce68a6cc (patch)
treef472e90e311c09a7dcf5a0ca61e95462948c53e5
parent71f2a5e97a48430f3625182c786aca3939b22267 (diff)
downloadydb-cc0a3366ece6c8245f14660caad22780ce68a6cc.tar.gz
The Commit call waits Acks (#9761)
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h10
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/transaction.cpp58
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/transaction.h36
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/ya.make1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.cpp28
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.h22
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp23
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp18
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h10
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp145
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h23
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/include/write_session.h16
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp219
-rw-r--r--ydb/public/sdk/cpp/examples/topic_writer/transaction/main.cpp44
17 files changed, 503 insertions, 163 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h
index 5b348395887..7194a541f00 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h
@@ -155,10 +155,16 @@ public:
NThreading::TFuture<ui64> GetInitSeqNo() override {
return TryGetImpl()->GetInitSeqNo();
}
- void Write(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& message) override {
+ void Write(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& message, NTable::TTransaction* tx = nullptr) override {
+ if (tx) {
+ ythrow yexception() << "transactions are not supported";
+ }
TryGetImpl()->Write(std::move(continuationToken), std::move(message));
}
- void WriteEncoded(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& params) override {
+ void WriteEncoded(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& params, NTable::TTransaction* tx = nullptr) override {
+ if (tx) {
+ ythrow yexception() << "transactions are not supported";
+ }
TryGetImpl()->WriteEncoded(std::move(continuationToken), std::move(params));
}
void Write(NTopic::TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ui64> seqNo = Nothing(),
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h
index 4b9d482f604..03f802d2cb2 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h
@@ -20,7 +20,6 @@ namespace NTable {
using TSessionInspectorFn = std::function<void(TAsyncCreateSessionResult future)>;
-
class TSession::TImpl : public TKqpSessionCommon {
friend class TTableClient;
friend class TSession;
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
index 23006f7ff9f..9635c646424 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
@@ -646,12 +646,12 @@ TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSessio
return promise.GetFuture();
}
-TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSession& session, const TTransaction& tx,
+TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSession& session, const TString& txId,
const TCommitTxSettings& settings)
{
auto request = MakeOperationRequest<Ydb::Table::CommitTransactionRequest>(settings);
request.set_session_id(session.GetId());
- request.set_tx_id(tx.GetId());
+ request.set_tx_id(txId);
request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_));
auto promise = NewPromise<TCommitTransactionResult>();
@@ -684,12 +684,12 @@ TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSess
return promise.GetFuture();
}
-TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, const TTransaction& tx,
+TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, const TString& txId,
const TRollbackTxSettings& settings)
{
auto request = MakeOperationRequest<Ydb::Table::RollbackTransactionRequest>(settings);
request.set_session_id(session.GetId());
- request.set_tx_id(tx.GetId());
+ request.set_tx_id(txId);
return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::RollbackTransactionRequest, Ydb::Table::RollbackTransactionResponse>(
std::move(request),
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
index 8de98455558..2d20737210e 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
@@ -109,9 +109,9 @@ public:
TAsyncBeginTransactionResult BeginTransaction(const TSession& session, const TTxSettings& txSettings,
const TBeginTxSettings& settings);
- TAsyncCommitTransactionResult CommitTransaction(const TSession& session, const TTransaction& tx,
+ TAsyncCommitTransactionResult CommitTransaction(const TSession& session, const TString& txId,
const TCommitTxSettings& settings);
- TAsyncStatus RollbackTransaction(const TSession& session, const TTransaction& tx,
+ TAsyncStatus RollbackTransaction(const TSession& session, const TString& txId,
const TRollbackTxSettings& settings);
TAsyncExplainDataQueryResult ExplainDataQuery(const TSession& session, const TString& query,
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/transaction.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/transaction.cpp
new file mode 100644
index 00000000000..2c00e9ae9d4
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/transaction.cpp
@@ -0,0 +1,58 @@
+#include "transaction.h"
+#include "table_client.h"
+
+namespace NYdb::NTable {
+
+TTransaction::TImpl::TImpl(const TSession& session, const TString& txId)
+ : Session_(session)
+ , TxId_(txId)
+{
+}
+
+TAsyncCommitTransactionResult TTransaction::TImpl::Commit(const TCommitTxSettings& settings)
+{
+ ChangesAreAccepted = false;
+
+ auto result = NThreading::MakeFuture(TStatus(EStatus::SUCCESS, {}));
+
+ for (auto& callback : PrecommitCallbacks) {
+ auto action = [curr = callback()](const TAsyncStatus& prev) {
+ if (const TStatus& status = prev.GetValue(); !status.IsSuccess()) {
+ return prev;
+ }
+
+ return curr;
+ };
+
+ result = result.Apply(action);
+ }
+
+ auto precommitsCompleted = [this, settings](const TAsyncStatus& result) mutable {
+ if (const TStatus& status = result.GetValue(); !status.IsSuccess()) {
+ return NThreading::MakeFuture(TCommitTransactionResult(TStatus(status), Nothing()));
+ }
+
+ return Session_.Client_->CommitTransaction(Session_,
+ TxId_,
+ settings);
+ };
+
+ return result.Apply(precommitsCompleted);
+}
+
+TAsyncStatus TTransaction::TImpl::Rollback(const TRollbackTxSettings& settings)
+{
+ ChangesAreAccepted = false;
+ return Session_.Client_->RollbackTransaction(Session_, TxId_, settings);
+}
+
+void TTransaction::TImpl::AddPrecommitCallback(TPrecommitTransactionCallback cb)
+{
+ if (!ChangesAreAccepted) {
+ ythrow TContractViolation("Changes are no longer accepted");
+ }
+
+ PrecommitCallbacks.push_back(std::move(cb));
+}
+
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/transaction.h b/ydb/public/sdk/cpp/client/ydb_table/impl/transaction.h
new file mode 100644
index 00000000000..12df59f8065
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/transaction.h
@@ -0,0 +1,36 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+namespace NYdb::NTable {
+
+class TTransaction::TImpl {
+public:
+ TImpl(const TSession& session, const TString& txId);
+
+ const TString& GetId() const {
+ return TxId_;
+ }
+
+ bool IsActive() const {
+ return !TxId_.empty();
+ }
+
+ TAsyncCommitTransactionResult Commit(const TCommitTxSettings& settings = TCommitTxSettings());
+ TAsyncStatus Rollback(const TRollbackTxSettings& settings = TRollbackTxSettings());
+
+ TSession GetSession() const {
+ return Session_;
+ }
+
+ void AddPrecommitCallback(TPrecommitTransactionCallback cb);
+
+private:
+ TSession Session_;
+ TString TxId_;
+
+ bool ChangesAreAccepted = true; // haven't called Commit or Rollback yet
+ TVector<TPrecommitTransactionCallback> PrecommitCallbacks;
+};
+
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/ya.make b/ydb/public/sdk/cpp/client/ydb_table/impl/ya.make
index 58b9b082880..513829155f0 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/ya.make
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/ya.make
@@ -6,6 +6,7 @@ SRCS(
readers.cpp
request_migrator.cpp
table_client.cpp
+ transaction.cpp
)
PEERDIR(
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
index 6764312c5dc..767964fc4d7 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
@@ -18,6 +18,7 @@
#include <ydb/public/sdk/cpp/client/ydb_table/impl/data_query.h>
#include <ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h>
#include <ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/impl/transaction.h>
#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
#include <google/protobuf/util/time_util.h>
@@ -1997,16 +1998,35 @@ TTxControl::TTxControl(const TTxSettings& begin)
////////////////////////////////////////////////////////////////////////////////
TTransaction::TTransaction(const TSession& session, const TString& txId)
- : Session_(session)
- , TxId_(txId)
+ : TransactionImpl_(new TTransaction::TImpl(session, txId))
{}
+const TString& TTransaction::GetId() const
+{
+ return TransactionImpl_->GetId();
+}
+
+bool TTransaction::IsActive() const
+{
+ return TransactionImpl_->IsActive();
+}
+
TAsyncCommitTransactionResult TTransaction::Commit(const TCommitTxSettings& settings) {
- return Session_.Client_->CommitTransaction(Session_, *this, settings);
+ return TransactionImpl_->Commit(settings);
}
TAsyncStatus TTransaction::Rollback(const TRollbackTxSettings& settings) {
- return Session_.Client_->RollbackTransaction(Session_, *this, settings);
+ return TransactionImpl_->Rollback(settings);
+}
+
+TSession TTransaction::GetSession() const
+{
+ return TransactionImpl_->GetSession();
+}
+
+void TTransaction::AddPrecommitCallback(TPrecommitTransactionCallback cb)
+{
+ TransactionImpl_->AddPrecommitCallback(std::move(cb));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h
index fb94b201b5a..3b50f810f7e 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.h
@@ -1692,6 +1692,8 @@ struct TReadTableSettings : public TRequestSettings<TReadTableSettings> {
FLUENT_SETTING_OPTIONAL(bool, ReturnNotNullAsOptional);
};
+using TPrecommitTransactionCallback = std::function<TAsyncStatus ()>;
+
//! Represents all session operations
//! Session is transparent logic representation of connection
class TSession {
@@ -1829,26 +1831,22 @@ TAsyncStatus TTableClient::RetryOperation(
class TTransaction {
friend class TTableClient;
public:
- const TString& GetId() const {
- return TxId_;
- }
-
- bool IsActive() const {
- return !TxId_.empty();
- }
+ const TString& GetId() const;
+ bool IsActive() const;
TAsyncCommitTransactionResult Commit(const TCommitTxSettings& settings = TCommitTxSettings());
TAsyncStatus Rollback(const TRollbackTxSettings& settings = TRollbackTxSettings());
- TSession GetSession() const {
- return Session_;
- }
+ TSession GetSession() const;
+
+ void AddPrecommitCallback(TPrecommitTransactionCallback cb);
private:
TTransaction(const TSession& session, const TString& txId);
- TSession Session_;
- TString TxId_;
+ class TImpl;
+
+ std::shared_ptr<TImpl> TransactionImpl_;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
index f3e462dbfa1..c2fa56effff 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
@@ -230,14 +230,21 @@ void TReadSession::UpdateOffsets(const NTable::TTransaction& tx)
Y_ABORT_UNLESS(!topics.empty());
- auto result = Client->UpdateOffsetsInTransaction(tx,
- topics,
- Settings.ConsumerName_,
- {}).GetValueSync();
- Y_ABORT_UNLESS(!result.IsTransportError());
-
- if (!result.IsSuccess()) {
- ythrow yexception() << "error on update offsets: " << result;
+ while (true) {
+ auto result = Client->UpdateOffsetsInTransaction(tx,
+ topics,
+ Settings.ConsumerName_,
+ {}).GetValueSync();
+ Y_ABORT_UNLESS(!result.IsTransportError());
+
+ if (result.GetStatus() != EStatus::SESSION_BUSY) {
+ if (!result.IsSuccess()) {
+ ythrow yexception() << "error on update offsets: " << result;
+ }
+ break;
+ }
+
+ Sleep(TDuration::MilliSeconds(1));
}
OffsetRanges.erase(std::make_pair(sessionId, txId));
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
index 45192578b3a..a8e170a9074 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
@@ -45,8 +45,12 @@ void TWriteSession::WriteEncoded(TContinuationToken&& token, TStringBuf data, EC
TryGetImpl()->WriteInternal(std::move(token), std::move(message));
}
-void TWriteSession::WriteEncoded(TContinuationToken&& token, TWriteMessage&& message)
+void TWriteSession::WriteEncoded(TContinuationToken&& token, TWriteMessage&& message,
+ NTable::TTransaction* tx)
{
+ if (tx) {
+ message.Tx(*tx);
+ }
TryGetImpl()->WriteInternal(std::move(token), std::move(message));
}
@@ -60,7 +64,11 @@ void TWriteSession::Write(TContinuationToken&& token, TStringBuf data, TMaybe<ui
TryGetImpl()->WriteInternal(std::move(token), std::move(message));
}
-void TWriteSession::Write(TContinuationToken&& token, TWriteMessage&& message) {
+void TWriteSession::Write(TContinuationToken&& token, TWriteMessage&& message,
+ NTable::TTransaction* tx) {
+ if (tx) {
+ message.Tx(*tx);
+ }
TryGetImpl()->WriteInternal(std::move(token), std::move(message));
}
@@ -112,15 +120,15 @@ bool TSimpleBlockingWriteSession::Write(
auto message = TWriteMessage(std::move(data))
.SeqNo(seqNo)
.CreateTimestamp(createTimestamp);
- return Write(std::move(message), blockTimeout);
+ return Write(std::move(message), nullptr, blockTimeout);
}
bool TSimpleBlockingWriteSession::Write(
- TWriteMessage&& message, const TDuration& blockTimeout
+ TWriteMessage&& message, NTable::TTransaction* tx, const TDuration& blockTimeout
) {
auto continuationToken = WaitForToken(blockTimeout);
if (continuationToken.Defined()) {
- Writer->Write(std::move(*continuationToken), std::move(message));
+ Writer->Write(std::move(*continuationToken), std::move(message), tx);
return true;
}
return false;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
index 658efe86595..f430ae54a3c 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
@@ -36,9 +36,11 @@ public:
void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize,
TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override;
- void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) override;
+ void Write(TContinuationToken&& continuationToken, TWriteMessage&& message,
+ NTable::TTransaction* tx = nullptr) override;
- void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& message) override;
+ void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& message,
+ NTable::TTransaction* tx = nullptr) override;
NThreading::TFuture<void> WaitEvent() override;
@@ -67,7 +69,9 @@ public:
bool Write(TStringBuf data, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing(),
const TDuration& blockTimeout = TDuration::Max()) override;
- bool Write(TWriteMessage&& message, const TDuration& blockTimeout = TDuration::Max()) override;
+ bool Write(TWriteMessage&& message,
+ NTable::TTransaction* tx = nullptr,
+ const TDuration& blockTimeout = TDuration::Max()) override;
ui64 GetInitSeqNo() override;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
index 929c00cb1e1..67916a10b6b 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
@@ -507,13 +507,131 @@ NThreading::TFuture<void> TWriteSessionImpl::WaitEvent() {
return EventsQueue->WaitEvent();
}
+void TWriteSessionImpl::OnTransactionCommit()
+{
+}
+
+TStatus MakeStatus(EStatus code, NYql::TIssues&& issues)
+{
+ return {code, std::move(issues)};
+}
+
+TStatus MakeSessionExpiredError()
+{
+ return MakeStatus(EStatus::SESSION_EXPIRED, {});
+}
+
+TStatus MakeCommitTransactionSuccess()
+{
+ return MakeStatus(EStatus::SUCCESS, {});
+}
+
+std::pair<TString, TString> MakeTransactionId(const TTransaction& tx)
+{
+ return {tx.GetSession().GetId(), tx.GetId()};
+}
+
+void TWriteSessionImpl::TrySubscribeOnTransactionCommit(TTransaction* tx)
+{
+ if (!tx) {
+ return;
+ }
+
+ const TTransactionId txId = MakeTransactionId(*tx);
+ TTransactionInfoPtr txInfo = GetOrCreateTxInfo(txId);
+
+ with_lock(txInfo->Lock) {
+ if (txInfo->Subscribed) {
+ return;
+ }
+
+ txInfo->IsActive = true;
+ txInfo->Subscribed = true;
+ txInfo->AllAcksReceived = NThreading::NewPromise<TStatus>();
+ }
+
+ auto callback = [txInfo]() {
+ with_lock(txInfo->Lock) {
+ txInfo->CommitCalled = true;
+
+ if (txInfo->WriteCount == txInfo->AckCount) {
+ txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess());
+ return txInfo->AllAcksReceived.GetFuture();
+ }
+
+ if (txInfo->IsActive) {
+ return txInfo->AllAcksReceived.GetFuture();
+ }
+
+ return NThreading::MakeFuture(MakeSessionExpiredError());
+ }
+ };
+
+ tx->AddPrecommitCallback(std::move(callback));
+}
+
+void TWriteSessionImpl::TrySignalAllAcksReceived(ui64 seqNo)
+{
+ Y_ABORT_UNLESS(Lock.IsLocked());
+
+ auto p = WrittenInTx.find(seqNo);
+ if (p == WrittenInTx.end()) {
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG,
+ LogPrefix() << "OnAck: seqno=" << seqNo << ", txId=?");
+ return;
+ }
+
+ const TTransactionId& txId = p->second;
+ TTransactionInfoPtr txInfo = GetOrCreateTxInfo(txId);
+
+ with_lock(txInfo->Lock) {
+ ++txInfo->AckCount;
+
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG,
+ LogPrefix() << "OnAck: seqNo=" << seqNo << ", txId=" << txId.second<< ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount);
+
+ if (txInfo->CommitCalled && (txInfo->WriteCount == txInfo->AckCount)) {
+ txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess());
+
+ Txs.erase(txId);
+ }
+ }
+}
+
+auto TWriteSessionImpl::GetOrCreateTxInfo(const TTransactionId& txId) -> TTransactionInfoPtr
+{
+ auto p = Txs.find(txId);
+ if (p == Txs.end()) {
+ TTransactionInfoPtr& txInfo = Txs[txId];
+ txInfo = std::make_shared<TTransactionInfo>();
+ txInfo->Subscribed = false;
+ txInfo->CommitCalled = false;
+ p = Txs.find(txId);
+ }
+ return p->second;
+}
+
void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& message) {
TInstant createdAtValue = message.CreateTimestamp_.Defined() ? *message.CreateTimestamp_ : TInstant::Now();
bool readyToAccept = false;
size_t bufferSize = message.Data.size();
with_lock(Lock) {
+ TrySubscribeOnTransactionCommit(message.GetTxPtr());
+
+ ui64 seqNo = GetNextIdImpl(message.SeqNo_);
+
+ if (message.GetTxPtr()) {
+ const auto& txId = MakeTransactionId(*message.GetTxPtr());
+ TTransactionInfoPtr txInfo = GetOrCreateTxInfo(txId);
+ ++txInfo->WriteCount;
+ WrittenInTx[seqNo] = txId;
+
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG,
+ LogPrefix() << "OnWrite: seqNo=" << seqNo << ", txId=" << txId.second << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount);
+ }
+
CurrentBatch.Add(
- GetNextIdImpl(message.SeqNo_), createdAtValue, message.Data, message.Codec, message.OriginalSize,
+ seqNo, createdAtValue, message.Data, message.Codec, message.OriginalSize,
message.MessageMeta_,
message.GetTxPtr()
);
@@ -979,10 +1097,12 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
for (const auto& ack : batchWriteResponse.acks()) {
// TODO: Fill writer statistics
- ui64 sequenceNumber = ack.seq_no();
+ ui64 msgId = GetIdImpl(ack.seq_no());
Y_ABORT_UNLESS(ack.has_written() || ack.has_skipped() || ack.has_written_in_tx());
+ TrySignalAllAcksReceived(msgId);
+
TWriteSessionEvent::TWriteAck::EEventState msgWriteStatus;
if (ack.has_written_in_tx()) {
msgWriteStatus = TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX;
@@ -998,7 +1118,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
ui64 offset = ack.has_written() ? ack.written().offset() : 0;
acksEvent.Acks.push_back(TWriteSessionEvent::TWriteAck{
- GetIdImpl(sequenceNumber),
+ msgId,
msgWriteStatus,
TWriteSessionEvent::TWriteAck::TWrittenMessageDetails {
offset,
@@ -1007,7 +1127,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
writeStat,
});
- if (CleanupOnAcknowledged(GetIdImpl(sequenceNumber))) {
+ if (CleanupOnAcknowledged(msgId)) {
result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
}
}
@@ -1065,6 +1185,9 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 id) {
(*Counters->BytesInflightTotal) = MemoryUsage;
SentOriginalMessages.pop();
+
+ WrittenInTx.erase(id);
+
return result;
}
@@ -1550,10 +1673,22 @@ void TWriteSessionImpl::AbortImpl() {
Cancel(ConnectDelayContext);
if (Processor)
Processor->Cancel();
-
Cancel(ClientContext);
ClientContext.reset(); // removes context from contexts set from underlying gRPC-client.
+ CancelTransactions();
+ }
+}
+
+void TWriteSessionImpl::CancelTransactions()
+{
+ for (auto& [_, txInfo] : Txs) {
+ txInfo->IsActive = false;
+ if (txInfo->WriteCount != txInfo->AckCount) {
+ txInfo->AllAcksReceived.SetValue(MakeSessionExpiredError());
+ }
}
+
+ Txs.clear();
}
void TWriteSessionImpl::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) {
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
index 74e47b15af4..2dfaa84b4c0 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
@@ -303,6 +303,19 @@ private:
i64 Generation;
};
+ struct TTransactionInfo {
+ TSpinLock Lock;
+ bool IsActive = false;
+ bool Subscribed = false;
+ NThreading::TPromise<TStatus> AllAcksReceived;
+ bool CommitCalled = false;
+ ui64 WriteCount = 0;
+ ui64 AckCount = 0;
+ };
+
+ using TTransactionId = std::pair<TString, TString>; // SessionId, TxId
+ using TTransactionInfoPtr = std::shared_ptr<TTransactionInfo>;
+
THandleResult OnErrorImpl(NYdb::TPlainStatus&& status); // true - should Start(), false - should Close(), empty - no action
public:
TWriteSessionImpl(const TWriteSessionSettings& settings,
@@ -406,6 +419,13 @@ private:
bool TxIsChanged(const Ydb::Topic::StreamWriteMessage_WriteRequest* writeRequest) const;
+ void TrySubscribeOnTransactionCommit(TTransaction* tx);
+ void CancelTransactions();
+ TTransactionInfoPtr GetOrCreateTxInfo(const TTransactionId& txId);
+ void TrySignalAllAcksReceived(ui64 seqNo);
+
+ void OnTransactionCommit();
+
private:
TWriteSessionSettings Settings;
std::shared_ptr<TTopicClient::TImpl> Client;
@@ -467,6 +487,9 @@ private:
TMaybe<ui64> DirectWriteToPartitionId;
protected:
ui64 MessagesAcquired = 0;
+
+ THashMap<TTransactionId, TTransactionInfoPtr> Txs;
+ THashMap<ui64, TTransactionId> WrittenInTx; // SeqNo -> TxId
};
} // namespace NYdb::NTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/include/write_session.h b/ydb/public/sdk/cpp/client/ydb_topic/include/write_session.h
index eb7587dcc3e..e2b3301b2b8 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/include/write_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/include/write_session.h
@@ -17,6 +17,8 @@ namespace NYdb::NTable {
namespace NYdb::NTopic {
+using TTransaction = NTable::TTransaction;
+
//! Settings for write session.
struct TWriteSessionSettings : public TRequestSettings<TWriteSessionSettings> {
using TSelf = TWriteSessionSettings;
@@ -190,9 +192,9 @@ public:
FLUENT_SETTING(TMessageMeta, MessageMeta);
//! Transaction id
- FLUENT_SETTING_OPTIONAL(std::reference_wrapper<NTable::TTransaction>, Tx);
+ FLUENT_SETTING_OPTIONAL(std::reference_wrapper<TTransaction>, Tx);
- const NTable::TTransaction* GetTxPtr() const
+ TTransaction* GetTxPtr() const
{
return Tx_ ? &Tx_->get() : nullptr;
}
@@ -204,7 +206,9 @@ public:
//! Write single message. Blocks for up to blockTimeout if inflight is full or memoryUsage is exceeded;
//! return - true if write succeeded, false if message was not enqueued for write within blockTimeout.
//! no Ack is provided.
- virtual bool Write(TWriteMessage&& message, const TDuration& blockTimeout = TDuration::Max()) = 0;
+ virtual bool Write(TWriteMessage&& message,
+ NTable::TTransaction* tx = nullptr,
+ const TDuration& blockTimeout = TDuration::Max()) = 0;
//! Write single message. Deprecated method with only basic message options.
@@ -249,7 +253,8 @@ public:
//! Write single message.
//! continuationToken - a token earlier provided to client with ReadyToAccept event.
- virtual void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) = 0;
+ virtual void Write(TContinuationToken&& continuationToken, TWriteMessage&& message,
+ NTable::TTransaction* tx = nullptr) = 0;
//! Write single message. Old method with only basic message options.
virtual void Write(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ui64> seqNo = Nothing(),
@@ -257,7 +262,8 @@ public:
//! Write single message that is already coded by codec.
//! continuationToken - a token earlier provided to client with ReadyToAccept event.
- virtual void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& params) = 0;
+ virtual void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& params,
+ NTable::TTransaction* tx = nullptr) = 0;
//! Write single message that is already compressed by codec. Old method with only basic message options.
virtual void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize,
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
index 3c36b11a325..4b529bfaaad 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
@@ -109,7 +109,8 @@ protected:
const TString& messageGroupId,
NYdb::EStatus status);
void CloseTopicWriteSession(const TString& topicPath,
- const TString& messageGroupId);
+ const TString& messageGroupId,
+ bool force = false);
void CloseTopicReadSession(const TString& topicPath,
const TString& consumerName);
@@ -243,21 +244,38 @@ NTable::TSession TFixture::CreateTableSession()
NTable::TTransaction TFixture::BeginTx(NTable::TSession& session)
{
- auto result = session.BeginTransaction().ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- return result.GetTransaction();
+ while (true) {
+ auto result = session.BeginTransaction().ExtractValueSync();
+ if (result.GetStatus() != EStatus::SESSION_BUSY) {
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ return result.GetTransaction();
+ }
+ Sleep(TDuration::MilliSeconds(100));
+ }
}
void TFixture::CommitTx(NTable::TTransaction& tx, EStatus status)
{
- auto result = tx.Commit().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), status, result.GetIssues().ToString());
+ while (true) {
+ auto result = tx.Commit().ExtractValueSync();
+ if (result.GetStatus() != EStatus::SESSION_BUSY) {
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), status, result.GetIssues().ToString());
+ return;
+ }
+ Sleep(TDuration::MilliSeconds(100));
+ }
}
void TFixture::RollbackTx(NTable::TTransaction& tx, EStatus status)
{
- auto result = tx.Rollback().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), status, result.GetIssues().ToString());
+ while (true) {
+ auto result = tx.Rollback().ExtractValueSync();
+ if (result.GetStatus() != EStatus::SESSION_BUSY) {
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), status, result.GetIssues().ToString());
+ return;
+ }
+ Sleep(TDuration::MilliSeconds(100));
+ }
}
auto TFixture::CreateReader() -> TTopicReadSessionPtr
@@ -490,15 +508,15 @@ Y_UNIT_TEST_F(TwoSessionOneConsumer, TFixture)
CommitTx(tx1, EStatus::ABORTED);
}
-Y_UNIT_TEST_F(WriteToTopic_Invalid_Session, TFixture)
-{
- WriteToTopicWithInvalidTxId(false);
-}
-
-Y_UNIT_TEST_F(WriteToTopic_Invalid_Tx, TFixture)
-{
- WriteToTopicWithInvalidTxId(true);
-}
+//Y_UNIT_TEST_F(WriteToTopic_Invalid_Session, TFixture)
+//{
+// WriteToTopicWithInvalidTxId(false);
+//}
+//
+//Y_UNIT_TEST_F(WriteToTopic_Invalid_Tx, TFixture)
+//{
+// WriteToTopicWithInvalidTxId(true);
+//}
Y_UNIT_TEST_F(WriteToTopic_Two_WriteSession, TFixture)
{
@@ -694,7 +712,8 @@ void TFixture::TTopicWriteSessionContext::Write(const TString& message, NTable::
}
void TFixture::CloseTopicWriteSession(const TString& topicPath,
- const TString& messageGroupId)
+ const TString& messageGroupId,
+ bool force)
{
std::pair<TString, TString> key(topicPath, messageGroupId);
auto i = TopicWriteSessions.find(key);
@@ -703,7 +722,7 @@ void TFixture::CloseTopicWriteSession(const TString& topicPath,
TTopicWriteSessionContext& context = i->second;
- context.Session->Close();
+ context.Session->Close(force ? TDuration::MilliSeconds(0) : TDuration::Max());
TopicWriteSessions.erase(key);
}
@@ -972,9 +991,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_1, TFixture)
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #8", &tx);
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #9", &tx);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
- WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID);
-
{
auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0);
@@ -1022,11 +1038,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_2, TFixture)
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_1, "message #8", &tx);
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_1, "message #9", &tx);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1);
- WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID_1);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2);
- WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID_2);
-
{
auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
@@ -1069,14 +1080,9 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_3, TFixture)
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #3");
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
- WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID);
-
- {
- auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
- UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #3");
- }
+ auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
+ UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #3");
CommitTx(tx, EStatus::ABORTED);
@@ -1085,10 +1091,15 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_3, TFixture)
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx);
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
- WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID);
-
CommitTx(tx, EStatus::SUCCESS);
+
+ messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
+ UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1");
+
+ messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2));
+ UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #2");
}
Y_UNIT_TEST_F(WriteToTopic_Demo_4, TFixture)
@@ -1102,41 +1113,27 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_4, TFixture)
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx_1);
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx_1);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
- WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID);
-
NTable::TTransaction tx_2 = BeginTx(tableSession);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #3", &tx_2);
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #4", &tx_2);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
- WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID);
-
- {
- auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
- UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0);
- }
+ auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
+ UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0);
- {
- auto messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2));
- UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0);
- }
+ messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2));
+ UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0);
CommitTx(tx_2, EStatus::SUCCESS);
CommitTx(tx_1, EStatus::ABORTED);
- {
- auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
- UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #3");
- }
+ messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
+ UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #3");
- {
- auto messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2));
- UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #4");
- }
+ messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2));
+ UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #4");
}
Y_UNIT_TEST_F(WriteToTopic_Demo_5, TFixture)
@@ -1152,9 +1149,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_5, TFixture)
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx_1);
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx_1);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
- WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID);
-
CommitTx(tx_1, EStatus::SUCCESS);
}
@@ -1164,9 +1158,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_5, TFixture)
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #3", &tx_2);
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #4", &tx_2);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
- WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID);
-
CommitTx(tx_2, EStatus::SUCCESS);
}
@@ -1195,8 +1186,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_6, TFixture)
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
-
{
auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0);
@@ -1230,9 +1219,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_7, TFixture)
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #5", &tx);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #6", &tx);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2);
-
{
auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
@@ -1261,8 +1247,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_8, TFixture)
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2");
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
-
{
auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
@@ -1275,8 +1259,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_8, TFixture)
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
-
CommitTx(tx, EStatus::SUCCESS);
{
@@ -1295,14 +1277,10 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_9, TFixture)
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx_1);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
-
NTable::TTransaction tx_2 = BeginTx(tableSession);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx_2);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
-
{
auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0);
@@ -1329,8 +1307,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_10, TFixture)
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx_1);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
-
CommitTx(tx_1, EStatus::SUCCESS);
}
@@ -1339,8 +1315,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_10, TFixture)
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx_2);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
-
CommitTx(tx_2, EStatus::SUCCESS);
}
@@ -1534,6 +1508,7 @@ void TFixture::TestTheCompletionOfATransaction(const TTransactionCompletionTestD
for (auto& topic : d.Topics) {
WriteToTopic(topic, TEST_MESSAGE_GROUP_ID, "message", &tx);
+ // TODO: нужен callback для RollbakTx
WaitForAcks(topic, TEST_MESSAGE_GROUP_ID);
}
@@ -1554,6 +1529,10 @@ void TFixture::TestTheCompletionOfATransaction(const TTransactionCompletionTestD
for (auto& topic : d.Topics) {
CheckTabletKeys(topic);
}
+
+ for (auto& topic : d.Topics) {
+ CloseTopicWriteSession(topic, TEST_MESSAGE_GROUP_ID);
+ }
}
Y_UNIT_TEST_F(WriteToTopic_Demo_11, TFixture)
@@ -1610,27 +1589,30 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_14, TFixture)
CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
CommitTx(tx, EStatus::ABORTED);
}
Y_UNIT_TEST_F(WriteToTopic_Demo_15, TFixture)
{
+ // the session of writing to the topic can be closed before the commit
CreateTopic("topic_A");
NTable::TSession tableSession = CreateTableSession();
NTable::TTransaction tx = BeginTx(tableSession);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #1", &tx);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1);
CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID_1);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, "message #2", &tx);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2);
CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID_2);
CommitTx(tx, EStatus::SUCCESS);
+
+ auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
+ UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1");
+ UNIT_ASSERT_VALUES_EQUAL(messages[1], "message #2");
}
Y_UNIT_TEST_F(WriteToTopic_Demo_16, TFixture)
@@ -1643,8 +1625,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_16, TFixture)
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
-
RestartPQTablet("topic_A", 0);
CommitTx(tx, EStatus::SUCCESS);
@@ -1672,8 +1652,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_17, TFixture)
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(20'000'000, 'x'), &tx);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString( 7'000'000, 'x'), &tx);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
-
CommitTx(tx, EStatus::SUCCESS);
//RestartPQTablet("topic_A", 0);
@@ -1703,19 +1681,16 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params)
for (size_t i = 0; i < params.OldHeadCount; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'));
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
++oldHeadMsgCount;
}
for (size_t i = 0; i < params.BigBlobsCount; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
++bigBlobMsgCount;
}
for (size_t i = 0; i < params.NewHeadCount; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'), &tx);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
++newHeadMsgCount;
}
@@ -1879,8 +1854,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_24, TFixture)
WriteToTable("table_A", records, &tx);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
-
CommitTx(tx, EStatus::SUCCESS);
auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
@@ -1914,8 +1887,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_25, TFixture)
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, m, &tx);
}
- WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID);
-
CommitTx(tx, EStatus::SUCCESS);
messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2));
@@ -1947,8 +1918,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_26, TFixture)
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, m, &tx, PARTITION_1);
}
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
-
CommitTx(tx, EStatus::SUCCESS);
messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), nullptr, PARTITION_1);
@@ -1961,7 +1930,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture)
CreateTopic("topic_B", TEST_CONSUMER);
CreateTopic("topic_C", TEST_CONSUMER);
- for (size_t i = 0, writtenInTx = 0; i < 2; ++i) {
+ for (size_t i = 0; i < 2; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", nullptr, 0);
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", nullptr, 0);
@@ -1971,14 +1940,10 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture)
auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0);
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0);
- ++writtenInTx;
- WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID, writtenInTx);
messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0);
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0);
- ++writtenInTx;
- WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID, writtenInTx);
CommitTx(tx, EStatus::SUCCESS);
@@ -2003,12 +1968,10 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_28, TFixture)
TString message(16'000, 'a');
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, TString(16'000, 'a'), &tx, 0);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1);
CommitTx(tx, EStatus::SUCCESS);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, TString(20'000, 'b'), nullptr, 0);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2);
auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), nullptr, 0);
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
@@ -2023,12 +1986,10 @@ void TFixture::WriteMessagesInTx(size_t big, size_t small)
for (size_t i = 0; i < big; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx, 0);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
}
for (size_t i = 0; i < small; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(16'384, 'x'), &tx, 0);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
}
CommitTx(tx, EStatus::SUCCESS);
@@ -2108,8 +2069,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_39, TFixture)
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx);
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
-
AddConsumer("topic_A", {"consumer"});
CommitTx(tx, EStatus::SUCCESS);
@@ -2153,6 +2112,42 @@ Y_UNIT_TEST_F(ReadRuleGeneration, TFixture)
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
}
+Y_UNIT_TEST_F(WriteToTopic_Demo_40, TFixture)
+{
+ // The recording stream will run into a quota. Before the commit, the client will receive confirmations
+ // for some of the messages. The `CommitTx` call will wait for the rest.
+ CreateTopic("topic_A", TEST_CONSUMER);
+
+ NTable::TSession tableSession = CreateTableSession();
+ NTable::TTransaction tx = BeginTx(tableSession);
+
+ for (size_t k = 0; k < 100; ++k) {
+ WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(1'000'000, 'a'), &tx);
+ }
+
+ CommitTx(tx, EStatus::SUCCESS);
+
+ auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(60));
+ UNIT_ASSERT_VALUES_EQUAL(messages.size(), 100);
+}
+
+Y_UNIT_TEST_F(WriteToTopic_Demo_41, TFixture)
+{
+ // If the recording session does not wait for confirmations, the commit will fail
+ CreateTopic("topic_A", TEST_CONSUMER);
+
+ NTable::TSession tableSession = CreateTableSession();
+ NTable::TTransaction tx = BeginTx(tableSession);
+
+ for (size_t k = 0; k < 100; ++k) {
+ WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(1'000'000, 'a'), &tx);
+ }
+
+ CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID, true); // force close
+
+ CommitTx(tx, EStatus::SESSION_EXPIRED);
+}
+
}
}
diff --git a/ydb/public/sdk/cpp/examples/topic_writer/transaction/main.cpp b/ydb/public/sdk/cpp/examples/topic_writer/transaction/main.cpp
new file mode 100644
index 00000000000..77030e2e2fd
--- /dev/null
+++ b/ydb/public/sdk/cpp/examples/topic_writer/transaction/main.cpp
@@ -0,0 +1,44 @@
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+void ThrowOnError(const NYdb::TStatus& status)
+{
+ if (status.IsSuccess()) {
+ return;
+ }
+
+ ythrow yexception() << status;
+}
+
+int main()
+{
+ const TString ENDPOINT = "HOST:PORT";
+ const TString DATABASE = "DATABASE";
+ const TString TOPIC = "PATH/TO/TOPIC";
+
+ NYdb::TDriverConfig config;
+ config.SetEndpoint(ENDPOINT);
+ config.SetDatabase(DATABASE);
+ NYdb::TDriver driver(config);
+
+ NYdb::NTable::TTableClient tableClient(driver);
+ auto getTableSessionResult = tableClient.GetSession().GetValueSync();
+ ThrowOnError(getTableSessionResult);
+ auto tableSession = getTableSessionResult.GetSession();
+
+ NYdb::NTopic::TTopicClient topicClient(driver);
+ auto topicSessionSettings = NYdb::NTopic::TWriteSessionSettings()
+ .Path(TOPIC)
+ .DeduplicationEnabled(true);
+ auto topicSession = topicClient.CreateSimpleBlockingWriteSession(topicSessionSettings);
+
+ auto beginTransactionResult = tableSession.BeginTransaction().GetValueSync();
+ ThrowOnError(beginTransactionResult);
+ auto transaction = beginTransactionResult.GetTransaction();
+
+ NYdb::NTopic::TWriteMessage writeMessage("message");
+
+ topicSession->Write(std::move(writeMessage), &transaction);
+
+ transaction.Commit().GetValueSync();
+}