diff options
| author | ildar-khisam <[email protected]> | 2023-12-04 12:08:36 +0300 |
|---|---|---|
| committer | ildar-khisam <[email protected]> | 2023-12-04 15:59:20 +0300 |
| commit | 8a17f12ce8da549aecb623bc406523885c8dba4e (patch) | |
| tree | 42cc5a95aa5b411887cd9c9edda4c7eb7b3a5200 | |
| parent | 0311ea8d1618ae1f7384ef3342432fa33aa0df0c (diff) | |
fix topic write sdk bug
fix bug with wrapping handlers
reproduce bug
correct handler to write session events
4 files changed, 78 insertions, 21 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp index 8f3a6c4862d..20d1484e408 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -55,7 +55,6 @@ TWriteSessionImpl::TWriteSessionImpl( , Connections(std::move(connections)) , DbDriverState(std::move(dbDriverState)) , PrevToken(DbDriverState->CredentialsProvider ? DbDriverState->CredentialsProvider->GetAuthInfo() : "") - , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings)) , InitSeqNoPromise(NThreading::NewPromise<ui64>()) , WakeupInterval( Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) ? @@ -77,6 +76,25 @@ TWriteSessionImpl::TWriteSessionImpl( void TWriteSessionImpl::Start(const TDuration& delay) { Y_ABORT_UNLESS(SelfContext); + + if (!EventsQueue) { +#define WRAP_HANDLER(type, handler, ...) \ + if (auto h = Settings.EventHandlers_.handler##_) { \ + Settings.EventHandlers_.handler([ctx = SelfContext, h = std::move(h)](__VA_ARGS__ type& ev){ \ + if (auto self = ctx->LockShared()) { \ + h(ev); \ + } \ + }); \ + } + WRAP_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler); + WRAP_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHander); + WRAP_HANDLER(TSessionClosedEvent, SessionClosedHandler, const); + WRAP_HANDLER(TWriteSessionEvent::TEvent, CommonHandler); +#undef WRAP_HANDLER + + EventsQueue = std::make_shared<TWriteSessionEventsQueue>(Settings); + } + ++ConnectionAttemptsDone; if (!Started) { with_lock(Lock) { @@ -398,7 +416,7 @@ void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& mess readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk; } if (readyToAccept) { - EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}}); + EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}); } } @@ -794,7 +812,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess OnErrorResolved(); if (!FirstTokenSent) { - result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}}); + result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}); FirstTokenSent = true; } // Kickstart send after session reestablishment @@ -846,7 +864,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess }); if (CleanupOnAcknowledged(GetIdImpl(sequenceNumber))) { - result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}}); + result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}); } } //EventsQueue->PushEvent(std::move(acksEvent)); @@ -986,7 +1004,7 @@ void TWriteSessionImpl::OnCompressed(TBlock&& block, bool isSyncCompression) { memoryUsage = OnCompressedImpl(std::move(block)); } if (memoryUsage.NowOk && !memoryUsage.WasOk) { - EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}}); + EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}); } } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h index 3643cff0464..5c6f9c41dd3 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h @@ -103,14 +103,16 @@ public: private: struct THandlersVisitor : public TParent::TBaseHandlersVisitor { using TParent::TBaseHandlersVisitor::TBaseHandlersVisitor; -#define DECLARE_HANDLER(type, handler, answer) \ - bool operator()(type& event) { \ - if (Settings.EventHandlers_.handler) { \ - Settings.EventHandlers_.handler(event); \ - return answer; \ - } \ - return false; \ - } \ +#define DECLARE_HANDLER(type, handler, answer) \ + bool operator()(type&) { \ + if (this->PushHandler<type>( \ + std::move(TParent::TBaseHandlersVisitor::Event), \ + this->Settings.EventHandlers_.handler, \ + this->Settings.EventHandlers_.CommonHandler_)) { \ + return answer; \ + } \ + return false; \ + } \ /**/ DECLARE_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler_, true); DECLARE_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHander_, true); @@ -149,7 +151,8 @@ struct TMemoryUsageChange { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TWriteSessionImpl -class TWriteSessionImpl : public NPersQueue::TEnableSelfContext<TWriteSessionImpl> { +class TWriteSessionImpl : public TContinuationTokenIssuer, + public NPersQueue::TEnableSelfContext<TWriteSessionImpl> { private: friend class TWriteSession; friend class TSimpleBlockingWriteSession; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index 4ca544044cc..f1080b3fb75 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -2,6 +2,7 @@ #include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h> #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> #include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> +#include <ydb/public/sdk/cpp/client/ydb_types/exceptions/exceptions.h> #include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/logger/log.h> @@ -652,12 +653,35 @@ struct TWriteStat : public TThrRefBase { using TPtr = TIntrusivePtr<TWriteStat>; }; -class TContinuationToken : public TMoveOnly { - friend class TWriteSessionImpl; +class TContinuationToken : public TNonCopyable { + friend class TContinuationTokenIssuer; + + bool Valid = true; + +public: + TContinuationToken& operator=(TContinuationToken&& other) { + if (!other.Valid) { + ythrow TContractViolation("Cannot move invalid token"); + } + Valid = std::exchange(other.Valid, false); + return *this; + } + + TContinuationToken(TContinuationToken&& other) { + *this = std::move(other); + } + private: TContinuationToken() = default; }; +class TContinuationTokenIssuer { +protected: + static auto IssueContinuationToken() { + return TContinuationToken{}; + } +}; + struct TWriterCounters : public TThrRefBase { using TSelf = TWriterCounters; using TPtr = TIntrusivePtr<TSelf>; @@ -1224,7 +1248,19 @@ struct TWriteSessionEvent { //! Indicates that a writer is ready to accept new message(s). //! Continuation token should be kept and then used in write methods. struct TReadyToAcceptEvent : public TPrintable<TReadyToAcceptEvent> { - TContinuationToken ContinuationToken; + mutable TContinuationToken ContinuationToken; + + TReadyToAcceptEvent() = delete; + TReadyToAcceptEvent(TContinuationToken&& t) : ContinuationToken(std::move(t)) { + } + TReadyToAcceptEvent(TReadyToAcceptEvent&&) = default; + TReadyToAcceptEvent(const TReadyToAcceptEvent& other) : ContinuationToken(std::move(other.ContinuationToken)) { + } + TReadyToAcceptEvent& operator=(TReadyToAcceptEvent&&) = default; + TReadyToAcceptEvent& operator=(const TReadyToAcceptEvent& other) { + ContinuationToken = std::move(other.ContinuationToken); + return *this; + } }; using TEvent = std::variant<TAcksEvent, TReadyToAcceptEvent, TSessionClosedEvent>; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp index f4d90703f21..8b4dc036134 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp @@ -138,7 +138,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { TTopicSdkTestSetup setup(TEST_CASE_NAME); TTopicClient client = setup.MakeClient(); - { + for (size_t i = 0; i < 100; ++i) { auto writeSettings = TWriteSessionSettings() .Path(TEST_TOPIC) .ProducerId(TEST_MESSAGE_GROUP_ID) @@ -176,9 +176,9 @@ Y_UNIT_TEST_SUITE(BasicUsage) { dataReceived.Commit(); auto& messages = dataReceived.GetMessages(); - UNIT_ASSERT(messages.size() == 2); + UNIT_ASSERT(messages.size() == 101); UNIT_ASSERT(messages[0].GetData() == "message_using_MessageGroupId"); - UNIT_ASSERT(messages[1].GetData() == "message_using_PartitionId"); + UNIT_ASSERT(messages[100].GetData() == "message_using_PartitionId"); } } @@ -195,7 +195,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { .MaxMemoryUsageBytes(1_MB) .DecompressionExecutor(decompressor) .AppendTopics(topic); - + TWriteSessionSettings writeSettings; writeSettings .Path(TEST_TOPIC) |
