diff options
author | Alek5andr-Kotov <152866892+Alek5andr-Kotov@users.noreply.github.com> | 2024-03-05 20:17:23 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-05 20:17:23 +0300 |
commit | b82d8c102598370dc0f62fd7f9910baa878edb2b (patch) | |
tree | 8594a04580c4d87f08c37ab26239ed5bd177886e | |
parent | 856e900053afe3d69f4e78622d7c404a91e76f86 (diff) | |
download | ydb-b82d8c102598370dc0f62fd7f9910baa878edb2b.tar.gz |
two writing sessions and one transaction (#2388)
-rw-r--r-- | ydb/core/persqueue/writer/writer.cpp | 42 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp | 64 |
2 files changed, 103 insertions, 3 deletions
diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index 297f8111ba..67bfcaf108 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -20,6 +20,8 @@ #include <util/generic/map.h> #include <util/string/builder.h> +#include <library/cpp/retry/retry_policy.h> + namespace NKikimr::NPQ { #if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR) @@ -186,6 +188,16 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl return InitResult(reason, std::move(response)); } + void Retry(Ydb::StatusIds::StatusCode code) { + if (!RetryState) { + RetryState = GetRetryPolicy()->CreateRetryState(); + } + + if (auto delay = RetryState->GetNextRetryDelay(code); delay.Defined()) { + Schedule(*delay, new TEvents::TEvWakeup()); + } + } + template <typename... Args> void SendWriteResult(Args&&... args) { Send(Client, new TEvPartitionWriter::TEvWriteResponse(Opts.SessionId, Opts.TxId, std::forward<Args>(args)...)); @@ -222,16 +234,21 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl switch (ev->GetTypeRewrite()) { HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleWriteId); hFunc(TEvPartitionWriter::TEvWriteRequest, HoldPending); + SFunc(TEvents::TEvWakeup, GetWriteId); default: return StateBase(ev); } } void HandleWriteId(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { - Y_UNUSED(ctx); - auto& record = ev->Get()->Record.GetRef(); - if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + switch (record.GetYdbStatus()) { + case Ydb::StatusIds::SUCCESS: + break; + case Ydb::StatusIds::SESSION_BUSY: + case Ydb::StatusIds::PRECONDITION_FAILED: // see TKqpSessionActor::ReplyBusy + return Retry(record.GetYdbStatus()); + default: return InitResult("Invalid KQP session", record); } @@ -848,6 +865,25 @@ private: EErrorCode ErrorCode = EErrorCode::InternalError; ui64 WriteId = INVALID_WRITE_ID; + + using IRetryPolicy = IRetryPolicy<Ydb::StatusIds::StatusCode>; + using IRetryState = IRetryPolicy::IRetryState; + + static IRetryPolicy::TPtr GetRetryPolicy() { + return IRetryPolicy::GetExponentialBackoffPolicy(Retryable); + }; + + static ERetryErrorClass Retryable(Ydb::StatusIds::StatusCode code) { + switch (code) { + case Ydb::StatusIds::SESSION_BUSY: + case Ydb::StatusIds::PRECONDITION_FAILED: + return ERetryErrorClass::ShortRetry; + default: + return ERetryErrorClass::NoRetry; + } + }; + + IRetryState::TPtr RetryState; }; // TPartitionWriter diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index 678642f2d8..6574cb9cda 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -321,6 +321,70 @@ Y_UNIT_TEST_F(WriteToTopic_Invalid_Tx, TFixture) WriteToTopicWithInvalidTxId(true); } +Y_UNIT_TEST_F(WriteToTopic_Two_WriteSession, TFixture) +{ + TString topicPath[2] = { + TEST_TOPIC, + TEST_TOPIC + "_2" + }; + + CreateTopic(topicPath[1]); + + auto createWriteSession = [](NTopic::TTopicClient& client, const TString& topicPath) { + NTopic::TWriteSessionSettings options; + options.Path(topicPath); + options.MessageGroupId(TEST_MESSAGE_GROUP_ID); + + return client.CreateWriteSession(options); + }; + + auto writeMessage = [](auto& ws, const TString& message, auto& tx) { + NTopic::TWriteMessage params(message); + params.Tx(tx); + + auto event = ws->GetEvent(true); + UNIT_ASSERT(event.Defined() && std::holds_alternative<TWriteSessionEvent::TReadyToAcceptEvent>(event.GetRef())); + auto token = std::move(std::get<TWriteSessionEvent::TReadyToAcceptEvent>(event.GetRef()).ContinuationToken); + + ws->Write(std::move(token), std::move(params)); + }; + + auto tableSession = CreateSession(); + auto tx = BeginTx(tableSession); + + NTopic::TTopicClient client(GetDriver()); + + auto ws0 = createWriteSession(client, topicPath[0]); + auto ws1 = createWriteSession(client, topicPath[1]); + + writeMessage(ws0, "message-1", tx); + writeMessage(ws1, "message-2", tx); + + size_t acks = 0; + + while (acks < 2) { + auto event = ws0->GetEvent(false); + if (!event) { + event = ws1->GetEvent(false); + if (!event) { + Sleep(TDuration::MilliSeconds(10)); + continue; + } + } + + auto& v = event.GetRef(); + if (auto e = std::get_if<TWriteSessionEvent::TAcksEvent>(&v); e) { + ++acks; + } else if (auto e = std::get_if<TWriteSessionEvent::TReadyToAcceptEvent>(&v); e) { + ; + } else if (auto e = std::get_if<TSessionClosedEvent>(&v); e) { + break; + } + } + + UNIT_ASSERT_VALUES_EQUAL(acks, 2); +} + } } |