summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <[email protected]>2023-12-04 12:08:36 +0300
committerildar-khisam <[email protected]>2023-12-04 15:59:20 +0300
commit8a17f12ce8da549aecb623bc406523885c8dba4e (patch)
tree42cc5a95aa5b411887cd9c9edda4c7eb7b3a5200
parent0311ea8d1618ae1f7384ef3342432fa33aa0df0c (diff)
fix topic write sdk bug
fix bug with wrapping handlers reproduce bug correct handler to write session events
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp28
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h21
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/topic.h42
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp8
4 files changed, 78 insertions, 21 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
index 8f3a6c4862d..20d1484e408 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
@@ -55,7 +55,6 @@ TWriteSessionImpl::TWriteSessionImpl(
, Connections(std::move(connections))
, DbDriverState(std::move(dbDriverState))
, PrevToken(DbDriverState->CredentialsProvider ? DbDriverState->CredentialsProvider->GetAuthInfo() : "")
- , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings))
, InitSeqNoPromise(NThreading::NewPromise<ui64>())
, WakeupInterval(
Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) ?
@@ -77,6 +76,25 @@ TWriteSessionImpl::TWriteSessionImpl(
void TWriteSessionImpl::Start(const TDuration& delay) {
Y_ABORT_UNLESS(SelfContext);
+
+ if (!EventsQueue) {
+#define WRAP_HANDLER(type, handler, ...) \
+ if (auto h = Settings.EventHandlers_.handler##_) { \
+ Settings.EventHandlers_.handler([ctx = SelfContext, h = std::move(h)](__VA_ARGS__ type& ev){ \
+ if (auto self = ctx->LockShared()) { \
+ h(ev); \
+ } \
+ }); \
+ }
+ WRAP_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler);
+ WRAP_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHander);
+ WRAP_HANDLER(TSessionClosedEvent, SessionClosedHandler, const);
+ WRAP_HANDLER(TWriteSessionEvent::TEvent, CommonHandler);
+#undef WRAP_HANDLER
+
+ EventsQueue = std::make_shared<TWriteSessionEventsQueue>(Settings);
+ }
+
++ConnectionAttemptsDone;
if (!Started) {
with_lock(Lock) {
@@ -398,7 +416,7 @@ void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& mess
readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk;
}
if (readyToAccept) {
- EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}});
+ EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
}
}
@@ -794,7 +812,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
OnErrorResolved();
if (!FirstTokenSent) {
- result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}});
+ result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
FirstTokenSent = true;
}
// Kickstart send after session reestablishment
@@ -846,7 +864,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
});
if (CleanupOnAcknowledged(GetIdImpl(sequenceNumber))) {
- result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}});
+ result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
}
}
//EventsQueue->PushEvent(std::move(acksEvent));
@@ -986,7 +1004,7 @@ void TWriteSessionImpl::OnCompressed(TBlock&& block, bool isSyncCompression) {
memoryUsage = OnCompressedImpl(std::move(block));
}
if (memoryUsage.NowOk && !memoryUsage.WasOk) {
- EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}});
+ EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
index 3643cff0464..5c6f9c41dd3 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
@@ -103,14 +103,16 @@ public:
private:
struct THandlersVisitor : public TParent::TBaseHandlersVisitor {
using TParent::TBaseHandlersVisitor::TBaseHandlersVisitor;
-#define DECLARE_HANDLER(type, handler, answer) \
- bool operator()(type& event) { \
- if (Settings.EventHandlers_.handler) { \
- Settings.EventHandlers_.handler(event); \
- return answer; \
- } \
- return false; \
- } \
+#define DECLARE_HANDLER(type, handler, answer) \
+ bool operator()(type&) { \
+ if (this->PushHandler<type>( \
+ std::move(TParent::TBaseHandlersVisitor::Event), \
+ this->Settings.EventHandlers_.handler, \
+ this->Settings.EventHandlers_.CommonHandler_)) { \
+ return answer; \
+ } \
+ return false; \
+ } \
/**/
DECLARE_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler_, true);
DECLARE_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHander_, true);
@@ -149,7 +151,8 @@ struct TMemoryUsageChange {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TWriteSessionImpl
-class TWriteSessionImpl : public NPersQueue::TEnableSelfContext<TWriteSessionImpl> {
+class TWriteSessionImpl : public TContinuationTokenIssuer,
+ public NPersQueue::TEnableSelfContext<TWriteSessionImpl> {
private:
friend class TWriteSession;
friend class TSimpleBlockingWriteSession;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
index 4ca544044cc..f1080b3fb75 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
@@ -2,6 +2,7 @@
#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/exceptions/exceptions.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/logger/log.h>
@@ -652,12 +653,35 @@ struct TWriteStat : public TThrRefBase {
using TPtr = TIntrusivePtr<TWriteStat>;
};
-class TContinuationToken : public TMoveOnly {
- friend class TWriteSessionImpl;
+class TContinuationToken : public TNonCopyable {
+ friend class TContinuationTokenIssuer;
+
+ bool Valid = true;
+
+public:
+ TContinuationToken& operator=(TContinuationToken&& other) {
+ if (!other.Valid) {
+ ythrow TContractViolation("Cannot move invalid token");
+ }
+ Valid = std::exchange(other.Valid, false);
+ return *this;
+ }
+
+ TContinuationToken(TContinuationToken&& other) {
+ *this = std::move(other);
+ }
+
private:
TContinuationToken() = default;
};
+class TContinuationTokenIssuer {
+protected:
+ static auto IssueContinuationToken() {
+ return TContinuationToken{};
+ }
+};
+
struct TWriterCounters : public TThrRefBase {
using TSelf = TWriterCounters;
using TPtr = TIntrusivePtr<TSelf>;
@@ -1224,7 +1248,19 @@ struct TWriteSessionEvent {
//! Indicates that a writer is ready to accept new message(s).
//! Continuation token should be kept and then used in write methods.
struct TReadyToAcceptEvent : public TPrintable<TReadyToAcceptEvent> {
- TContinuationToken ContinuationToken;
+ mutable TContinuationToken ContinuationToken;
+
+ TReadyToAcceptEvent() = delete;
+ TReadyToAcceptEvent(TContinuationToken&& t) : ContinuationToken(std::move(t)) {
+ }
+ TReadyToAcceptEvent(TReadyToAcceptEvent&&) = default;
+ TReadyToAcceptEvent(const TReadyToAcceptEvent& other) : ContinuationToken(std::move(other.ContinuationToken)) {
+ }
+ TReadyToAcceptEvent& operator=(TReadyToAcceptEvent&&) = default;
+ TReadyToAcceptEvent& operator=(const TReadyToAcceptEvent& other) {
+ ContinuationToken = std::move(other.ContinuationToken);
+ return *this;
+ }
};
using TEvent = std::variant<TAcksEvent, TReadyToAcceptEvent, TSessionClosedEvent>;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
index f4d90703f21..8b4dc036134 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
@@ -138,7 +138,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
TTopicSdkTestSetup setup(TEST_CASE_NAME);
TTopicClient client = setup.MakeClient();
- {
+ for (size_t i = 0; i < 100; ++i) {
auto writeSettings = TWriteSessionSettings()
.Path(TEST_TOPIC)
.ProducerId(TEST_MESSAGE_GROUP_ID)
@@ -176,9 +176,9 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
dataReceived.Commit();
auto& messages = dataReceived.GetMessages();
- UNIT_ASSERT(messages.size() == 2);
+ UNIT_ASSERT(messages.size() == 101);
UNIT_ASSERT(messages[0].GetData() == "message_using_MessageGroupId");
- UNIT_ASSERT(messages[1].GetData() == "message_using_PartitionId");
+ UNIT_ASSERT(messages[100].GetData() == "message_using_PartitionId");
}
}
@@ -195,7 +195,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
.MaxMemoryUsageBytes(1_MB)
.DecompressionExecutor(decompressor)
.AppendTopics(topic);
-
+
TWriteSessionSettings writeSettings;
writeSettings
.Path(TEST_TOPIC)