aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlek5andr-Kotov <152866892+Alek5andr-Kotov@users.noreply.github.com>2024-03-05 20:17:23 +0300
committerGitHub <noreply@github.com>2024-03-05 20:17:23 +0300
commitb82d8c102598370dc0f62fd7f9910baa878edb2b (patch)
tree8594a04580c4d87f08c37ab26239ed5bd177886e
parent856e900053afe3d69f4e78622d7c404a91e76f86 (diff)
downloadydb-b82d8c102598370dc0f62fd7f9910baa878edb2b.tar.gz
two writing sessions and one transaction (#2388)
-rw-r--r--ydb/core/persqueue/writer/writer.cpp42
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp64
2 files changed, 103 insertions, 3 deletions
diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp
index 297f8111ba..67bfcaf108 100644
--- a/ydb/core/persqueue/writer/writer.cpp
+++ b/ydb/core/persqueue/writer/writer.cpp
@@ -20,6 +20,8 @@
#include <util/generic/map.h>
#include <util/string/builder.h>
+#include <library/cpp/retry/retry_policy.h>
+
namespace NKikimr::NPQ {
#if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR)
@@ -186,6 +188,16 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
return InitResult(reason, std::move(response));
}
+ void Retry(Ydb::StatusIds::StatusCode code) {
+ if (!RetryState) {
+ RetryState = GetRetryPolicy()->CreateRetryState();
+ }
+
+ if (auto delay = RetryState->GetNextRetryDelay(code); delay.Defined()) {
+ Schedule(*delay, new TEvents::TEvWakeup());
+ }
+ }
+
template <typename... Args>
void SendWriteResult(Args&&... args) {
Send(Client, new TEvPartitionWriter::TEvWriteResponse(Opts.SessionId, Opts.TxId, std::forward<Args>(args)...));
@@ -222,16 +234,21 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
switch (ev->GetTypeRewrite()) {
HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleWriteId);
hFunc(TEvPartitionWriter::TEvWriteRequest, HoldPending);
+ SFunc(TEvents::TEvWakeup, GetWriteId);
default:
return StateBase(ev);
}
}
void HandleWriteId(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
- Y_UNUSED(ctx);
-
auto& record = ev->Get()->Record.GetRef();
- if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
+ switch (record.GetYdbStatus()) {
+ case Ydb::StatusIds::SUCCESS:
+ break;
+ case Ydb::StatusIds::SESSION_BUSY:
+ case Ydb::StatusIds::PRECONDITION_FAILED: // see TKqpSessionActor::ReplyBusy
+ return Retry(record.GetYdbStatus());
+ default:
return InitResult("Invalid KQP session", record);
}
@@ -848,6 +865,25 @@ private:
EErrorCode ErrorCode = EErrorCode::InternalError;
ui64 WriteId = INVALID_WRITE_ID;
+
+ using IRetryPolicy = IRetryPolicy<Ydb::StatusIds::StatusCode>;
+ using IRetryState = IRetryPolicy::IRetryState;
+
+ static IRetryPolicy::TPtr GetRetryPolicy() {
+ return IRetryPolicy::GetExponentialBackoffPolicy(Retryable);
+ };
+
+ static ERetryErrorClass Retryable(Ydb::StatusIds::StatusCode code) {
+ switch (code) {
+ case Ydb::StatusIds::SESSION_BUSY:
+ case Ydb::StatusIds::PRECONDITION_FAILED:
+ return ERetryErrorClass::ShortRetry;
+ default:
+ return ERetryErrorClass::NoRetry;
+ }
+ };
+
+ IRetryState::TPtr RetryState;
}; // TPartitionWriter
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
index 678642f2d8..6574cb9cda 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
@@ -321,6 +321,70 @@ Y_UNIT_TEST_F(WriteToTopic_Invalid_Tx, TFixture)
WriteToTopicWithInvalidTxId(true);
}
+Y_UNIT_TEST_F(WriteToTopic_Two_WriteSession, TFixture)
+{
+ TString topicPath[2] = {
+ TEST_TOPIC,
+ TEST_TOPIC + "_2"
+ };
+
+ CreateTopic(topicPath[1]);
+
+ auto createWriteSession = [](NTopic::TTopicClient& client, const TString& topicPath) {
+ NTopic::TWriteSessionSettings options;
+ options.Path(topicPath);
+ options.MessageGroupId(TEST_MESSAGE_GROUP_ID);
+
+ return client.CreateWriteSession(options);
+ };
+
+ auto writeMessage = [](auto& ws, const TString& message, auto& tx) {
+ NTopic::TWriteMessage params(message);
+ params.Tx(tx);
+
+ auto event = ws->GetEvent(true);
+ UNIT_ASSERT(event.Defined() && std::holds_alternative<TWriteSessionEvent::TReadyToAcceptEvent>(event.GetRef()));
+ auto token = std::move(std::get<TWriteSessionEvent::TReadyToAcceptEvent>(event.GetRef()).ContinuationToken);
+
+ ws->Write(std::move(token), std::move(params));
+ };
+
+ auto tableSession = CreateSession();
+ auto tx = BeginTx(tableSession);
+
+ NTopic::TTopicClient client(GetDriver());
+
+ auto ws0 = createWriteSession(client, topicPath[0]);
+ auto ws1 = createWriteSession(client, topicPath[1]);
+
+ writeMessage(ws0, "message-1", tx);
+ writeMessage(ws1, "message-2", tx);
+
+ size_t acks = 0;
+
+ while (acks < 2) {
+ auto event = ws0->GetEvent(false);
+ if (!event) {
+ event = ws1->GetEvent(false);
+ if (!event) {
+ Sleep(TDuration::MilliSeconds(10));
+ continue;
+ }
+ }
+
+ auto& v = event.GetRef();
+ if (auto e = std::get_if<TWriteSessionEvent::TAcksEvent>(&v); e) {
+ ++acks;
+ } else if (auto e = std::get_if<TWriteSessionEvent::TReadyToAcceptEvent>(&v); e) {
+ ;
+ } else if (auto e = std::get_if<TSessionClosedEvent>(&v); e) {
+ break;
+ }
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(acks, 2);
+}
+
}
}