diff options
author | Alek5andr-Kotov <akotov@ydb.tech> | 2024-09-26 13:11:56 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-09-26 10:11:56 +0000 |
commit | cc0a3366ece6c8245f14660caad22780ce68a6cc (patch) | |
tree | f472e90e311c09a7dcf5a0ca61e95462948c53e5 | |
parent | 71f2a5e97a48430f3625182c786aca3939b22267 (diff) | |
download | ydb-cc0a3366ece6c8245f14660caad22780ce68a6cc.tar.gz |
The Commit call waits Acks (#9761)
17 files changed, 503 insertions, 163 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h index 5b348395887..7194a541f00 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h @@ -155,10 +155,16 @@ public: NThreading::TFuture<ui64> GetInitSeqNo() override { return TryGetImpl()->GetInitSeqNo(); } - void Write(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& message) override { + void Write(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& message, NTable::TTransaction* tx = nullptr) override { + if (tx) { + ythrow yexception() << "transactions are not supported"; + } TryGetImpl()->Write(std::move(continuationToken), std::move(message)); } - void WriteEncoded(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& params) override { + void WriteEncoded(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& params, NTable::TTransaction* tx = nullptr) override { + if (tx) { + ythrow yexception() << "transactions are not supported"; + } TryGetImpl()->WriteEncoded(std::move(continuationToken), std::move(params)); } void Write(NTopic::TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ui64> seqNo = Nothing(), diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h index 4b9d482f604..03f802d2cb2 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h @@ -20,7 +20,6 @@ namespace NTable { using TSessionInspectorFn = std::function<void(TAsyncCreateSessionResult future)>; - class TSession::TImpl : public TKqpSessionCommon { friend class TTableClient; friend class TSession; diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp index 23006f7ff9f..9635c646424 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp @@ -646,12 +646,12 @@ TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSessio return promise.GetFuture(); } -TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSession& session, const TTransaction& tx, +TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSession& session, const TString& txId, const TCommitTxSettings& settings) { auto request = MakeOperationRequest<Ydb::Table::CommitTransactionRequest>(settings); request.set_session_id(session.GetId()); - request.set_tx_id(tx.GetId()); + request.set_tx_id(txId); request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_)); auto promise = NewPromise<TCommitTransactionResult>(); @@ -684,12 +684,12 @@ TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSess return promise.GetFuture(); } -TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, const TTransaction& tx, +TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, const TString& txId, const TRollbackTxSettings& settings) { auto request = MakeOperationRequest<Ydb::Table::RollbackTransactionRequest>(settings); request.set_session_id(session.GetId()); - request.set_tx_id(tx.GetId()); + request.set_tx_id(txId); return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::RollbackTransactionRequest, Ydb::Table::RollbackTransactionResponse>( std::move(request), diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h index 8de98455558..2d20737210e 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h @@ -109,9 +109,9 @@ public: TAsyncBeginTransactionResult BeginTransaction(const TSession& session, const TTxSettings& txSettings, const TBeginTxSettings& settings); - TAsyncCommitTransactionResult CommitTransaction(const TSession& session, const TTransaction& tx, + TAsyncCommitTransactionResult CommitTransaction(const TSession& session, const TString& txId, const TCommitTxSettings& settings); - TAsyncStatus RollbackTransaction(const TSession& session, const TTransaction& tx, + TAsyncStatus RollbackTransaction(const TSession& session, const TString& txId, const TRollbackTxSettings& settings); TAsyncExplainDataQueryResult ExplainDataQuery(const TSession& session, const TString& query, diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/transaction.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/transaction.cpp new file mode 100644 index 00000000000..2c00e9ae9d4 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/transaction.cpp @@ -0,0 +1,58 @@ +#include "transaction.h" +#include "table_client.h" + +namespace NYdb::NTable { + +TTransaction::TImpl::TImpl(const TSession& session, const TString& txId) + : Session_(session) + , TxId_(txId) +{ +} + +TAsyncCommitTransactionResult TTransaction::TImpl::Commit(const TCommitTxSettings& settings) +{ + ChangesAreAccepted = false; + + auto result = NThreading::MakeFuture(TStatus(EStatus::SUCCESS, {})); + + for (auto& callback : PrecommitCallbacks) { + auto action = [curr = callback()](const TAsyncStatus& prev) { + if (const TStatus& status = prev.GetValue(); !status.IsSuccess()) { + return prev; + } + + return curr; + }; + + result = result.Apply(action); + } + + auto precommitsCompleted = [this, settings](const TAsyncStatus& result) mutable { + if (const TStatus& status = result.GetValue(); !status.IsSuccess()) { + return NThreading::MakeFuture(TCommitTransactionResult(TStatus(status), Nothing())); + } + + return Session_.Client_->CommitTransaction(Session_, + TxId_, + settings); + }; + + return result.Apply(precommitsCompleted); +} + +TAsyncStatus TTransaction::TImpl::Rollback(const TRollbackTxSettings& settings) +{ + ChangesAreAccepted = false; + return Session_.Client_->RollbackTransaction(Session_, TxId_, settings); +} + +void TTransaction::TImpl::AddPrecommitCallback(TPrecommitTransactionCallback cb) +{ + if (!ChangesAreAccepted) { + ythrow TContractViolation("Changes are no longer accepted"); + } + + PrecommitCallbacks.push_back(std::move(cb)); +} + +} diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/transaction.h b/ydb/public/sdk/cpp/client/ydb_table/impl/transaction.h new file mode 100644 index 00000000000..12df59f8065 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/transaction.h @@ -0,0 +1,36 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + +namespace NYdb::NTable { + +class TTransaction::TImpl { +public: + TImpl(const TSession& session, const TString& txId); + + const TString& GetId() const { + return TxId_; + } + + bool IsActive() const { + return !TxId_.empty(); + } + + TAsyncCommitTransactionResult Commit(const TCommitTxSettings& settings = TCommitTxSettings()); + TAsyncStatus Rollback(const TRollbackTxSettings& settings = TRollbackTxSettings()); + + TSession GetSession() const { + return Session_; + } + + void AddPrecommitCallback(TPrecommitTransactionCallback cb); + +private: + TSession Session_; + TString TxId_; + + bool ChangesAreAccepted = true; // haven't called Commit or Rollback yet + TVector<TPrecommitTransactionCallback> PrecommitCallbacks; +}; + +} diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/ya.make b/ydb/public/sdk/cpp/client/ydb_table/impl/ya.make index 58b9b082880..513829155f0 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/ya.make +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/ya.make @@ -6,6 +6,7 @@ SRCS( readers.cpp request_migrator.cpp table_client.cpp + transaction.cpp ) PEERDIR( diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 6764312c5dc..767964fc4d7 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -18,6 +18,7 @@ #include <ydb/public/sdk/cpp/client/ydb_table/impl/data_query.h> #include <ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h> #include <ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h> +#include <ydb/public/sdk/cpp/client/ydb_table/impl/transaction.h> #include <ydb/public/sdk/cpp/client/resources/ydb_resources.h> #include <google/protobuf/util/time_util.h> @@ -1997,16 +1998,35 @@ TTxControl::TTxControl(const TTxSettings& begin) //////////////////////////////////////////////////////////////////////////////// TTransaction::TTransaction(const TSession& session, const TString& txId) - : Session_(session) - , TxId_(txId) + : TransactionImpl_(new TTransaction::TImpl(session, txId)) {} +const TString& TTransaction::GetId() const +{ + return TransactionImpl_->GetId(); +} + +bool TTransaction::IsActive() const +{ + return TransactionImpl_->IsActive(); +} + TAsyncCommitTransactionResult TTransaction::Commit(const TCommitTxSettings& settings) { - return Session_.Client_->CommitTransaction(Session_, *this, settings); + return TransactionImpl_->Commit(settings); } TAsyncStatus TTransaction::Rollback(const TRollbackTxSettings& settings) { - return Session_.Client_->RollbackTransaction(Session_, *this, settings); + return TransactionImpl_->Rollback(settings); +} + +TSession TTransaction::GetSession() const +{ + return TransactionImpl_->GetSession(); +} + +void TTransaction::AddPrecommitCallback(TPrecommitTransactionCallback cb) +{ + TransactionImpl_->AddPrecommitCallback(std::move(cb)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index fb94b201b5a..3b50f810f7e 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -1692,6 +1692,8 @@ struct TReadTableSettings : public TRequestSettings<TReadTableSettings> { FLUENT_SETTING_OPTIONAL(bool, ReturnNotNullAsOptional); }; +using TPrecommitTransactionCallback = std::function<TAsyncStatus ()>; + //! Represents all session operations //! Session is transparent logic representation of connection class TSession { @@ -1829,26 +1831,22 @@ TAsyncStatus TTableClient::RetryOperation( class TTransaction { friend class TTableClient; public: - const TString& GetId() const { - return TxId_; - } - - bool IsActive() const { - return !TxId_.empty(); - } + const TString& GetId() const; + bool IsActive() const; TAsyncCommitTransactionResult Commit(const TCommitTxSettings& settings = TCommitTxSettings()); TAsyncStatus Rollback(const TRollbackTxSettings& settings = TRollbackTxSettings()); - TSession GetSession() const { - return Session_; - } + TSession GetSession() const; + + void AddPrecommitCallback(TPrecommitTransactionCallback cb); private: TTransaction(const TSession& session, const TString& txId); - TSession Session_; - TString TxId_; + class TImpl; + + std::shared_ptr<TImpl> TransactionImpl_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp index f3e462dbfa1..c2fa56effff 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp @@ -230,14 +230,21 @@ void TReadSession::UpdateOffsets(const NTable::TTransaction& tx) Y_ABORT_UNLESS(!topics.empty()); - auto result = Client->UpdateOffsetsInTransaction(tx, - topics, - Settings.ConsumerName_, - {}).GetValueSync(); - Y_ABORT_UNLESS(!result.IsTransportError()); - - if (!result.IsSuccess()) { - ythrow yexception() << "error on update offsets: " << result; + while (true) { + auto result = Client->UpdateOffsetsInTransaction(tx, + topics, + Settings.ConsumerName_, + {}).GetValueSync(); + Y_ABORT_UNLESS(!result.IsTransportError()); + + if (result.GetStatus() != EStatus::SESSION_BUSY) { + if (!result.IsSuccess()) { + ythrow yexception() << "error on update offsets: " << result; + } + break; + } + + Sleep(TDuration::MilliSeconds(1)); } OffsetRanges.erase(std::make_pair(sessionId, txId)); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp index 45192578b3a..a8e170a9074 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp @@ -45,8 +45,12 @@ void TWriteSession::WriteEncoded(TContinuationToken&& token, TStringBuf data, EC TryGetImpl()->WriteInternal(std::move(token), std::move(message)); } -void TWriteSession::WriteEncoded(TContinuationToken&& token, TWriteMessage&& message) +void TWriteSession::WriteEncoded(TContinuationToken&& token, TWriteMessage&& message, + NTable::TTransaction* tx) { + if (tx) { + message.Tx(*tx); + } TryGetImpl()->WriteInternal(std::move(token), std::move(message)); } @@ -60,7 +64,11 @@ void TWriteSession::Write(TContinuationToken&& token, TStringBuf data, TMaybe<ui TryGetImpl()->WriteInternal(std::move(token), std::move(message)); } -void TWriteSession::Write(TContinuationToken&& token, TWriteMessage&& message) { +void TWriteSession::Write(TContinuationToken&& token, TWriteMessage&& message, + NTable::TTransaction* tx) { + if (tx) { + message.Tx(*tx); + } TryGetImpl()->WriteInternal(std::move(token), std::move(message)); } @@ -112,15 +120,15 @@ bool TSimpleBlockingWriteSession::Write( auto message = TWriteMessage(std::move(data)) .SeqNo(seqNo) .CreateTimestamp(createTimestamp); - return Write(std::move(message), blockTimeout); + return Write(std::move(message), nullptr, blockTimeout); } bool TSimpleBlockingWriteSession::Write( - TWriteMessage&& message, const TDuration& blockTimeout + TWriteMessage&& message, NTable::TTransaction* tx, const TDuration& blockTimeout ) { auto continuationToken = WaitForToken(blockTimeout); if (continuationToken.Defined()) { - Writer->Write(std::move(*continuationToken), std::move(message)); + Writer->Write(std::move(*continuationToken), std::move(message), tx); return true; } return false; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h index 658efe86595..f430ae54a3c 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h @@ -36,9 +36,11 @@ public: void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override; - void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) override; + void Write(TContinuationToken&& continuationToken, TWriteMessage&& message, + NTable::TTransaction* tx = nullptr) override; - void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& message) override; + void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& message, + NTable::TTransaction* tx = nullptr) override; NThreading::TFuture<void> WaitEvent() override; @@ -67,7 +69,9 @@ public: bool Write(TStringBuf data, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing(), const TDuration& blockTimeout = TDuration::Max()) override; - bool Write(TWriteMessage&& message, const TDuration& blockTimeout = TDuration::Max()) override; + bool Write(TWriteMessage&& message, + NTable::TTransaction* tx = nullptr, + const TDuration& blockTimeout = TDuration::Max()) override; ui64 GetInitSeqNo() override; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp index 929c00cb1e1..67916a10b6b 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -507,13 +507,131 @@ NThreading::TFuture<void> TWriteSessionImpl::WaitEvent() { return EventsQueue->WaitEvent(); } +void TWriteSessionImpl::OnTransactionCommit() +{ +} + +TStatus MakeStatus(EStatus code, NYql::TIssues&& issues) +{ + return {code, std::move(issues)}; +} + +TStatus MakeSessionExpiredError() +{ + return MakeStatus(EStatus::SESSION_EXPIRED, {}); +} + +TStatus MakeCommitTransactionSuccess() +{ + return MakeStatus(EStatus::SUCCESS, {}); +} + +std::pair<TString, TString> MakeTransactionId(const TTransaction& tx) +{ + return {tx.GetSession().GetId(), tx.GetId()}; +} + +void TWriteSessionImpl::TrySubscribeOnTransactionCommit(TTransaction* tx) +{ + if (!tx) { + return; + } + + const TTransactionId txId = MakeTransactionId(*tx); + TTransactionInfoPtr txInfo = GetOrCreateTxInfo(txId); + + with_lock(txInfo->Lock) { + if (txInfo->Subscribed) { + return; + } + + txInfo->IsActive = true; + txInfo->Subscribed = true; + txInfo->AllAcksReceived = NThreading::NewPromise<TStatus>(); + } + + auto callback = [txInfo]() { + with_lock(txInfo->Lock) { + txInfo->CommitCalled = true; + + if (txInfo->WriteCount == txInfo->AckCount) { + txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess()); + return txInfo->AllAcksReceived.GetFuture(); + } + + if (txInfo->IsActive) { + return txInfo->AllAcksReceived.GetFuture(); + } + + return NThreading::MakeFuture(MakeSessionExpiredError()); + } + }; + + tx->AddPrecommitCallback(std::move(callback)); +} + +void TWriteSessionImpl::TrySignalAllAcksReceived(ui64 seqNo) +{ + Y_ABORT_UNLESS(Lock.IsLocked()); + + auto p = WrittenInTx.find(seqNo); + if (p == WrittenInTx.end()) { + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, + LogPrefix() << "OnAck: seqno=" << seqNo << ", txId=?"); + return; + } + + const TTransactionId& txId = p->second; + TTransactionInfoPtr txInfo = GetOrCreateTxInfo(txId); + + with_lock(txInfo->Lock) { + ++txInfo->AckCount; + + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, + LogPrefix() << "OnAck: seqNo=" << seqNo << ", txId=" << txId.second<< ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount); + + if (txInfo->CommitCalled && (txInfo->WriteCount == txInfo->AckCount)) { + txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess()); + + Txs.erase(txId); + } + } +} + +auto TWriteSessionImpl::GetOrCreateTxInfo(const TTransactionId& txId) -> TTransactionInfoPtr +{ + auto p = Txs.find(txId); + if (p == Txs.end()) { + TTransactionInfoPtr& txInfo = Txs[txId]; + txInfo = std::make_shared<TTransactionInfo>(); + txInfo->Subscribed = false; + txInfo->CommitCalled = false; + p = Txs.find(txId); + } + return p->second; +} + void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& message) { TInstant createdAtValue = message.CreateTimestamp_.Defined() ? *message.CreateTimestamp_ : TInstant::Now(); bool readyToAccept = false; size_t bufferSize = message.Data.size(); with_lock(Lock) { + TrySubscribeOnTransactionCommit(message.GetTxPtr()); + + ui64 seqNo = GetNextIdImpl(message.SeqNo_); + + if (message.GetTxPtr()) { + const auto& txId = MakeTransactionId(*message.GetTxPtr()); + TTransactionInfoPtr txInfo = GetOrCreateTxInfo(txId); + ++txInfo->WriteCount; + WrittenInTx[seqNo] = txId; + + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, + LogPrefix() << "OnWrite: seqNo=" << seqNo << ", txId=" << txId.second << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount); + } + CurrentBatch.Add( - GetNextIdImpl(message.SeqNo_), createdAtValue, message.Data, message.Codec, message.OriginalSize, + seqNo, createdAtValue, message.Data, message.Codec, message.OriginalSize, message.MessageMeta_, message.GetTxPtr() ); @@ -979,10 +1097,12 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess for (const auto& ack : batchWriteResponse.acks()) { // TODO: Fill writer statistics - ui64 sequenceNumber = ack.seq_no(); + ui64 msgId = GetIdImpl(ack.seq_no()); Y_ABORT_UNLESS(ack.has_written() || ack.has_skipped() || ack.has_written_in_tx()); + TrySignalAllAcksReceived(msgId); + TWriteSessionEvent::TWriteAck::EEventState msgWriteStatus; if (ack.has_written_in_tx()) { msgWriteStatus = TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX; @@ -998,7 +1118,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess ui64 offset = ack.has_written() ? ack.written().offset() : 0; acksEvent.Acks.push_back(TWriteSessionEvent::TWriteAck{ - GetIdImpl(sequenceNumber), + msgId, msgWriteStatus, TWriteSessionEvent::TWriteAck::TWrittenMessageDetails { offset, @@ -1007,7 +1127,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess writeStat, }); - if (CleanupOnAcknowledged(GetIdImpl(sequenceNumber))) { + if (CleanupOnAcknowledged(msgId)) { result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}); } } @@ -1065,6 +1185,9 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 id) { (*Counters->BytesInflightTotal) = MemoryUsage; SentOriginalMessages.pop(); + + WrittenInTx.erase(id); + return result; } @@ -1550,10 +1673,22 @@ void TWriteSessionImpl::AbortImpl() { Cancel(ConnectDelayContext); if (Processor) Processor->Cancel(); - Cancel(ClientContext); ClientContext.reset(); // removes context from contexts set from underlying gRPC-client. + CancelTransactions(); + } +} + +void TWriteSessionImpl::CancelTransactions() +{ + for (auto& [_, txInfo] : Txs) { + txInfo->IsActive = false; + if (txInfo->WriteCount != txInfo->AckCount) { + txInfo->AllAcksReceived.SetValue(MakeSessionExpiredError()); + } } + + Txs.clear(); } void TWriteSessionImpl::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) { diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h index 74e47b15af4..2dfaa84b4c0 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h @@ -303,6 +303,19 @@ private: i64 Generation; }; + struct TTransactionInfo { + TSpinLock Lock; + bool IsActive = false; + bool Subscribed = false; + NThreading::TPromise<TStatus> AllAcksReceived; + bool CommitCalled = false; + ui64 WriteCount = 0; + ui64 AckCount = 0; + }; + + using TTransactionId = std::pair<TString, TString>; // SessionId, TxId + using TTransactionInfoPtr = std::shared_ptr<TTransactionInfo>; + THandleResult OnErrorImpl(NYdb::TPlainStatus&& status); // true - should Start(), false - should Close(), empty - no action public: TWriteSessionImpl(const TWriteSessionSettings& settings, @@ -406,6 +419,13 @@ private: bool TxIsChanged(const Ydb::Topic::StreamWriteMessage_WriteRequest* writeRequest) const; + void TrySubscribeOnTransactionCommit(TTransaction* tx); + void CancelTransactions(); + TTransactionInfoPtr GetOrCreateTxInfo(const TTransactionId& txId); + void TrySignalAllAcksReceived(ui64 seqNo); + + void OnTransactionCommit(); + private: TWriteSessionSettings Settings; std::shared_ptr<TTopicClient::TImpl> Client; @@ -467,6 +487,9 @@ private: TMaybe<ui64> DirectWriteToPartitionId; protected: ui64 MessagesAcquired = 0; + + THashMap<TTransactionId, TTransactionInfoPtr> Txs; + THashMap<ui64, TTransactionId> WrittenInTx; // SeqNo -> TxId }; } // namespace NYdb::NTopic diff --git a/ydb/public/sdk/cpp/client/ydb_topic/include/write_session.h b/ydb/public/sdk/cpp/client/ydb_topic/include/write_session.h index eb7587dcc3e..e2b3301b2b8 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/include/write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/include/write_session.h @@ -17,6 +17,8 @@ namespace NYdb::NTable { namespace NYdb::NTopic { +using TTransaction = NTable::TTransaction; + //! Settings for write session. struct TWriteSessionSettings : public TRequestSettings<TWriteSessionSettings> { using TSelf = TWriteSessionSettings; @@ -190,9 +192,9 @@ public: FLUENT_SETTING(TMessageMeta, MessageMeta); //! Transaction id - FLUENT_SETTING_OPTIONAL(std::reference_wrapper<NTable::TTransaction>, Tx); + FLUENT_SETTING_OPTIONAL(std::reference_wrapper<TTransaction>, Tx); - const NTable::TTransaction* GetTxPtr() const + TTransaction* GetTxPtr() const { return Tx_ ? &Tx_->get() : nullptr; } @@ -204,7 +206,9 @@ public: //! Write single message. Blocks for up to blockTimeout if inflight is full or memoryUsage is exceeded; //! return - true if write succeeded, false if message was not enqueued for write within blockTimeout. //! no Ack is provided. - virtual bool Write(TWriteMessage&& message, const TDuration& blockTimeout = TDuration::Max()) = 0; + virtual bool Write(TWriteMessage&& message, + NTable::TTransaction* tx = nullptr, + const TDuration& blockTimeout = TDuration::Max()) = 0; //! Write single message. Deprecated method with only basic message options. @@ -249,7 +253,8 @@ public: //! Write single message. //! continuationToken - a token earlier provided to client with ReadyToAccept event. - virtual void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) = 0; + virtual void Write(TContinuationToken&& continuationToken, TWriteMessage&& message, + NTable::TTransaction* tx = nullptr) = 0; //! Write single message. Old method with only basic message options. virtual void Write(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ui64> seqNo = Nothing(), @@ -257,7 +262,8 @@ public: //! Write single message that is already coded by codec. //! continuationToken - a token earlier provided to client with ReadyToAccept event. - virtual void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& params) = 0; + virtual void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& params, + NTable::TTransaction* tx = nullptr) = 0; //! Write single message that is already compressed by codec. Old method with only basic message options. virtual void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize, diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index 3c36b11a325..4b529bfaaad 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -109,7 +109,8 @@ protected: const TString& messageGroupId, NYdb::EStatus status); void CloseTopicWriteSession(const TString& topicPath, - const TString& messageGroupId); + const TString& messageGroupId, + bool force = false); void CloseTopicReadSession(const TString& topicPath, const TString& consumerName); @@ -243,21 +244,38 @@ NTable::TSession TFixture::CreateTableSession() NTable::TTransaction TFixture::BeginTx(NTable::TSession& session) { - auto result = session.BeginTransaction().ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - return result.GetTransaction(); + while (true) { + auto result = session.BeginTransaction().ExtractValueSync(); + if (result.GetStatus() != EStatus::SESSION_BUSY) { + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + return result.GetTransaction(); + } + Sleep(TDuration::MilliSeconds(100)); + } } void TFixture::CommitTx(NTable::TTransaction& tx, EStatus status) { - auto result = tx.Commit().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), status, result.GetIssues().ToString()); + while (true) { + auto result = tx.Commit().ExtractValueSync(); + if (result.GetStatus() != EStatus::SESSION_BUSY) { + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), status, result.GetIssues().ToString()); + return; + } + Sleep(TDuration::MilliSeconds(100)); + } } void TFixture::RollbackTx(NTable::TTransaction& tx, EStatus status) { - auto result = tx.Rollback().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), status, result.GetIssues().ToString()); + while (true) { + auto result = tx.Rollback().ExtractValueSync(); + if (result.GetStatus() != EStatus::SESSION_BUSY) { + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), status, result.GetIssues().ToString()); + return; + } + Sleep(TDuration::MilliSeconds(100)); + } } auto TFixture::CreateReader() -> TTopicReadSessionPtr @@ -490,15 +508,15 @@ Y_UNIT_TEST_F(TwoSessionOneConsumer, TFixture) CommitTx(tx1, EStatus::ABORTED); } -Y_UNIT_TEST_F(WriteToTopic_Invalid_Session, TFixture) -{ - WriteToTopicWithInvalidTxId(false); -} - -Y_UNIT_TEST_F(WriteToTopic_Invalid_Tx, TFixture) -{ - WriteToTopicWithInvalidTxId(true); -} +//Y_UNIT_TEST_F(WriteToTopic_Invalid_Session, TFixture) +//{ +// WriteToTopicWithInvalidTxId(false); +//} +// +//Y_UNIT_TEST_F(WriteToTopic_Invalid_Tx, TFixture) +//{ +// WriteToTopicWithInvalidTxId(true); +//} Y_UNIT_TEST_F(WriteToTopic_Two_WriteSession, TFixture) { @@ -694,7 +712,8 @@ void TFixture::TTopicWriteSessionContext::Write(const TString& message, NTable:: } void TFixture::CloseTopicWriteSession(const TString& topicPath, - const TString& messageGroupId) + const TString& messageGroupId, + bool force) { std::pair<TString, TString> key(topicPath, messageGroupId); auto i = TopicWriteSessions.find(key); @@ -703,7 +722,7 @@ void TFixture::CloseTopicWriteSession(const TString& topicPath, TTopicWriteSessionContext& context = i->second; - context.Session->Close(); + context.Session->Close(force ? TDuration::MilliSeconds(0) : TDuration::Max()); TopicWriteSessions.erase(key); } @@ -972,9 +991,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_1, TFixture) WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #8", &tx); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #9", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID); - { auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); @@ -1022,11 +1038,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_2, TFixture) WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_1, "message #8", &tx); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_1, "message #9", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID_1); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID_2); - { auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); @@ -1069,14 +1080,9 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_3, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #3"); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID); - - { - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #3"); - } + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #3"); CommitTx(tx, EStatus::ABORTED); @@ -1085,10 +1091,15 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_3, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID); - CommitTx(tx, EStatus::SUCCESS); + + messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); + + messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #2"); } Y_UNIT_TEST_F(WriteToTopic_Demo_4, TFixture) @@ -1102,41 +1113,27 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_4, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx_1); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx_1); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID); - NTable::TTransaction tx_2 = BeginTx(tableSession); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #3", &tx_2); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #4", &tx_2); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID); - - { - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); - } + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); - { - auto messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); - } + messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); CommitTx(tx_2, EStatus::SUCCESS); CommitTx(tx_1, EStatus::ABORTED); - { - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #3"); - } + messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #3"); - { - auto messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #4"); - } + messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #4"); } Y_UNIT_TEST_F(WriteToTopic_Demo_5, TFixture) @@ -1152,9 +1149,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_5, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx_1); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx_1); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID); - CommitTx(tx_1, EStatus::SUCCESS); } @@ -1164,9 +1158,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_5, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #3", &tx_2); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #4", &tx_2); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID); - CommitTx(tx_2, EStatus::SUCCESS); } @@ -1195,8 +1186,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_6, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - { auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); @@ -1230,9 +1219,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_7, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #5", &tx); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #6", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2); - { auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); @@ -1261,8 +1247,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_8, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2"); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - { auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); @@ -1275,8 +1259,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_8, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - CommitTx(tx, EStatus::SUCCESS); { @@ -1295,14 +1277,10 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_9, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx_1); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - NTable::TTransaction tx_2 = BeginTx(tableSession); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx_2); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - { auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); @@ -1329,8 +1307,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_10, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx_1); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - CommitTx(tx_1, EStatus::SUCCESS); } @@ -1339,8 +1315,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_10, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx_2); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - CommitTx(tx_2, EStatus::SUCCESS); } @@ -1534,6 +1508,7 @@ void TFixture::TestTheCompletionOfATransaction(const TTransactionCompletionTestD for (auto& topic : d.Topics) { WriteToTopic(topic, TEST_MESSAGE_GROUP_ID, "message", &tx); + // TODO: нужен callback для RollbakTx WaitForAcks(topic, TEST_MESSAGE_GROUP_ID); } @@ -1554,6 +1529,10 @@ void TFixture::TestTheCompletionOfATransaction(const TTransactionCompletionTestD for (auto& topic : d.Topics) { CheckTabletKeys(topic); } + + for (auto& topic : d.Topics) { + CloseTopicWriteSession(topic, TEST_MESSAGE_GROUP_ID); + } } Y_UNIT_TEST_F(WriteToTopic_Demo_11, TFixture) @@ -1610,27 +1589,30 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_14, TFixture) CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); CommitTx(tx, EStatus::ABORTED); } Y_UNIT_TEST_F(WriteToTopic_Demo_15, TFixture) { + // the session of writing to the topic can be closed before the commit CreateTopic("topic_A"); NTable::TSession tableSession = CreateTableSession(); NTable::TTransaction tx = BeginTx(tableSession); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #1", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1); CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID_1); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, "message #2", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2); CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID_2); CommitTx(tx, EStatus::SUCCESS); + + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); + UNIT_ASSERT_VALUES_EQUAL(messages[1], "message #2"); } Y_UNIT_TEST_F(WriteToTopic_Demo_16, TFixture) @@ -1643,8 +1625,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_16, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - RestartPQTablet("topic_A", 0); CommitTx(tx, EStatus::SUCCESS); @@ -1672,8 +1652,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_17, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(20'000'000, 'x'), &tx); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString( 7'000'000, 'x'), &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - CommitTx(tx, EStatus::SUCCESS); //RestartPQTablet("topic_A", 0); @@ -1703,19 +1681,16 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params) for (size_t i = 0; i < params.OldHeadCount; ++i) { WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x')); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); ++oldHeadMsgCount; } for (size_t i = 0; i < params.BigBlobsCount; ++i) { WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); ++bigBlobMsgCount; } for (size_t i = 0; i < params.NewHeadCount; ++i) { WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'), &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); ++newHeadMsgCount; } @@ -1879,8 +1854,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_24, TFixture) WriteToTable("table_A", records, &tx); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - CommitTx(tx, EStatus::SUCCESS); auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); @@ -1914,8 +1887,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_25, TFixture) WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, m, &tx); } - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID); - CommitTx(tx, EStatus::SUCCESS); messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); @@ -1947,8 +1918,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_26, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, m, &tx, PARTITION_1); } - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - CommitTx(tx, EStatus::SUCCESS); messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), nullptr, PARTITION_1); @@ -1961,7 +1930,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture) CreateTopic("topic_B", TEST_CONSUMER); CreateTopic("topic_C", TEST_CONSUMER); - for (size_t i = 0, writtenInTx = 0; i < 2; ++i) { + for (size_t i = 0; i < 2; ++i) { WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", nullptr, 0); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", nullptr, 0); @@ -1971,14 +1940,10 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture) auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0); - ++writtenInTx; - WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID, writtenInTx); messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0); - ++writtenInTx; - WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID, writtenInTx); CommitTx(tx, EStatus::SUCCESS); @@ -2003,12 +1968,10 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_28, TFixture) TString message(16'000, 'a'); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, TString(16'000, 'a'), &tx, 0); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1); CommitTx(tx, EStatus::SUCCESS); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, TString(20'000, 'b'), nullptr, 0); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2); auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), nullptr, 0); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); @@ -2023,12 +1986,10 @@ void TFixture::WriteMessagesInTx(size_t big, size_t small) for (size_t i = 0; i < big; ++i) { WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx, 0); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); } for (size_t i = 0; i < small; ++i) { WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(16'384, 'x'), &tx, 0); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); } CommitTx(tx, EStatus::SUCCESS); @@ -2108,8 +2069,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_39, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - AddConsumer("topic_A", {"consumer"}); CommitTx(tx, EStatus::SUCCESS); @@ -2153,6 +2112,42 @@ Y_UNIT_TEST_F(ReadRuleGeneration, TFixture) UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); } +Y_UNIT_TEST_F(WriteToTopic_Demo_40, TFixture) +{ + // The recording stream will run into a quota. Before the commit, the client will receive confirmations + // for some of the messages. The `CommitTx` call will wait for the rest. + CreateTopic("topic_A", TEST_CONSUMER); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + for (size_t k = 0; k < 100; ++k) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(1'000'000, 'a'), &tx); + } + + CommitTx(tx, EStatus::SUCCESS); + + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(60)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 100); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_41, TFixture) +{ + // If the recording session does not wait for confirmations, the commit will fail + CreateTopic("topic_A", TEST_CONSUMER); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + for (size_t k = 0; k < 100; ++k) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(1'000'000, 'a'), &tx); + } + + CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID, true); // force close + + CommitTx(tx, EStatus::SESSION_EXPIRED); +} + } } diff --git a/ydb/public/sdk/cpp/examples/topic_writer/transaction/main.cpp b/ydb/public/sdk/cpp/examples/topic_writer/transaction/main.cpp new file mode 100644 index 00000000000..77030e2e2fd --- /dev/null +++ b/ydb/public/sdk/cpp/examples/topic_writer/transaction/main.cpp @@ -0,0 +1,44 @@ +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + +void ThrowOnError(const NYdb::TStatus& status) +{ + if (status.IsSuccess()) { + return; + } + + ythrow yexception() << status; +} + +int main() +{ + const TString ENDPOINT = "HOST:PORT"; + const TString DATABASE = "DATABASE"; + const TString TOPIC = "PATH/TO/TOPIC"; + + NYdb::TDriverConfig config; + config.SetEndpoint(ENDPOINT); + config.SetDatabase(DATABASE); + NYdb::TDriver driver(config); + + NYdb::NTable::TTableClient tableClient(driver); + auto getTableSessionResult = tableClient.GetSession().GetValueSync(); + ThrowOnError(getTableSessionResult); + auto tableSession = getTableSessionResult.GetSession(); + + NYdb::NTopic::TTopicClient topicClient(driver); + auto topicSessionSettings = NYdb::NTopic::TWriteSessionSettings() + .Path(TOPIC) + .DeduplicationEnabled(true); + auto topicSession = topicClient.CreateSimpleBlockingWriteSession(topicSessionSettings); + + auto beginTransactionResult = tableSession.BeginTransaction().GetValueSync(); + ThrowOnError(beginTransactionResult); + auto transaction = beginTransactionResult.GetTransaction(); + + NYdb::NTopic::TWriteMessage writeMessage("message"); + + topicSession->Write(std::move(writeMessage), &transaction); + + transaction.Commit().GetValueSync(); +} |