aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2023-12-15 14:01:16 +0300
committerildar-khisam <ikhis@ydb.tech>2023-12-15 15:40:44 +0300
commit5d7e58e04264a7572256b91a64e2db24b6c47fb9 (patch)
tree4da5f417f62098b4e187a39ddf2038739d36ebe4
parent7a3e28d96b991a9beda1e94549207dd1878f3bc2 (diff)
downloadydb-5d7e58e04264a7572256b91a64e2db24b6c47fb9.tar.gz
federated topic write
federated write draft
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h21
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-arm64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp23
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h10
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp14
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp27
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h5
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp294
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h110
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/ya.make2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp81
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp5
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/topic.h1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp6
20 files changed, 562 insertions, 48 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
index c3fb5c7676..121097e39d 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
@@ -238,12 +238,21 @@ struct TFederatedWriteSessionSettings : public NTopic::TWriteSessionSettings {
TFederatedWriteSessionSettings() = default;
TFederatedWriteSessionSettings(const TFederatedWriteSessionSettings&) = default;
TFederatedWriteSessionSettings(TFederatedWriteSessionSettings&&) = default;
+ TFederatedWriteSessionSettings& operator=(const TFederatedWriteSessionSettings&) = default;
+ TFederatedWriteSessionSettings& operator=(TFederatedWriteSessionSettings&&) = default;
+
TFederatedWriteSessionSettings(const TString& path, const TString& producerId, const TString& messageGroupId)
: NTopic::TWriteSessionSettings(path, producerId, messageGroupId) {
}
- TFederatedWriteSessionSettings& operator=(const TFederatedWriteSessionSettings&) = default;
- TFederatedWriteSessionSettings& operator=(TFederatedWriteSessionSettings&&) = default;
+ TFederatedWriteSessionSettings(const NTopic::TWriteSessionSettings& settings)
+ : NTopic::TWriteSessionSettings(settings) {
+ }
+ TFederatedWriteSessionSettings(NTopic::TWriteSessionSettings&& settings)
+ : NTopic::TWriteSessionSettings(std::move(settings)) {
+ }
+ // TFederatedWriteSessionSettings& operator=(const NTopic::TWriteSessionSettings&);
+ // TFederatedWriteSessionSettings& operator=(NTopic::TWriteSessionSettings&&);
};
//! Settings for read session.
@@ -365,8 +374,6 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings {
READ_MIRRORED
};
- // optional for read_mirrored case ?
-
//! Policy for federated reading.
//!
//! READ_ALL: read will be done from all topic instances from all databases.
@@ -442,11 +449,11 @@ public:
TFederatedTopicClient(const TDriver& driver, const TFederatedTopicClientSettings& settings = {});
//! Create read session.
- std::shared_ptr<IFederatedReadSession> CreateFederatedReadSession(const TFederatedReadSessionSettings& settings);
+ std::shared_ptr<IFederatedReadSession> CreateReadSession(const TFederatedReadSessionSettings& settings);
//! Create write session.
- // std::shared_ptr<NTopic::ISimpleBlockingWriteSession> CreateSimpleBlockingFederatedWriteSession(const TFederatedWriteSessionSettings& settings);
- // std::shared_ptr<NTopic::IWriteSession> CreateFederatedWriteSession(const TFederatedWriteSessionSettings& settings);
+ // std::shared_ptr<NTopic::ISimpleBlockingWriteSession> CreateSimpleBlockingWriteSession(const TFederatedWriteSessionSettings& settings);
+ std::shared_ptr<NTopic::IWriteSession> CreateWriteSession(const TFederatedWriteSessionSettings& settings);
private:
std::shared_ptr<TImpl> Impl_;
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-arm64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-arm64.txt
index d33d0f6b51..bea5a60f1e 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-arm64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-arm64.txt
@@ -27,6 +27,7 @@ target_link_libraries(client-ydb_federated_topic-impl PUBLIC
target_sources(client-ydb_federated_topic-impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-x86_64.txt
index d33d0f6b51..bea5a60f1e 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-x86_64.txt
@@ -27,6 +27,7 @@ target_link_libraries(client-ydb_federated_topic-impl PUBLIC
target_sources(client-ydb_federated_topic-impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-aarch64.txt
index 0847aec54a..efd677ea08 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-aarch64.txt
@@ -28,6 +28,7 @@ target_link_libraries(client-ydb_federated_topic-impl PUBLIC
target_sources(client-ydb_federated_topic-impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-x86_64.txt
index 0847aec54a..efd677ea08 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-x86_64.txt
@@ -28,6 +28,7 @@ target_link_libraries(client-ydb_federated_topic-impl PUBLIC
target_sources(client-ydb_federated_topic-impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.windows-x86_64.txt
index d33d0f6b51..bea5a60f1e 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.windows-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.windows-x86_64.txt
@@ -27,6 +27,7 @@ target_link_libraries(client-ydb_federated_topic-impl PUBLIC
target_sources(client-ydb_federated_topic-impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
index 07645caf6e..7563f2c85a 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
@@ -73,6 +73,9 @@ void TFederatedReadSessionImpl::Start() {
Y_UNUSED(f);
if (auto self = selfCtx->LockShared()) {
with_lock(self->Lock) {
+ if (self->Closing) {
+ return;
+ }
self->FederationState = self->Observer->GetState();
self->OnFederatedStateUpdateImpl();
}
@@ -110,11 +113,11 @@ NThreading::TFuture<void> TFederatedReadSessionImpl::WaitEvent() {
// TODO override with read session settings timeout
return AsyncInit.Apply([selfCtx = SelfContext](const NThreading::TFuture<void>) {
if (auto self = selfCtx->LockShared()) {
- if (self->Closing) {
- return NThreading::MakeFuture();
- }
std::vector<NThreading::TFuture<void>> waiters;
with_lock(self->Lock) {
+ if (self->Closing) {
+ return NThreading::MakeFuture();
+ }
Y_ABORT_UNLESS(!self->SubSessions.empty(), "SubSessions empty in discovered state");
for (const auto& sub : self->SubSessions) {
waiters.emplace_back(sub.Session->WaitEvent());
@@ -162,12 +165,16 @@ void TFederatedReadSessionImpl::CloseImpl() {
}
bool TFederatedReadSessionImpl::Close(TDuration timeout) {
- bool result = true;
- for (const auto& sub : SubSessions) {
- // TODO substract from user timeout
- result = sub.Session->Close(timeout);
+ with_lock(Lock) {
+ Closing = true;
+
+ bool result = true;
+ for (const auto& sub : SubSessions) {
+ // TODO substract from user timeout
+ result = sub.Session->Close(timeout);
+ }
+ return result;
}
- return result;
}
} // namespace NYdb::NFederatedTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h
index b073241d52..c0444f15a2 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h
@@ -55,16 +55,8 @@ private:
void OnFederatedStateUpdateImpl();
- void Abort();
void CloseImpl();
- void ClearAllEvents();
-
- // TODO Counters
- // void MakeCountersIfNeeded();
- // void DumpCountersToLog(size_t timeNumber = 0);
- // void ScheduleDumpCountersToLog(size_t timeNumber = 0);
-
private:
TFederatedReadSessionSettings Settings;
@@ -87,8 +79,6 @@ private:
std::vector<TSubSession> SubSessions;
size_t SubsessionIndex = 0;
- // NYdbGrpc::IQueueClientContextPtr DumpCountersContext;
-
// Exiting.
bool Closing = false;
};
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp
index fa135df282..ca076d602d 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp
@@ -14,17 +14,17 @@ TFederatedTopicClient::TFederatedTopicClient(const TDriver& driver, const TFeder
{
}
-std::shared_ptr<IFederatedReadSession> TFederatedTopicClient::CreateFederatedReadSession(const TFederatedReadSessionSettings& settings) {
- return Impl_->CreateFederatedReadSession(settings);
+std::shared_ptr<IFederatedReadSession> TFederatedTopicClient::CreateReadSession(const TFederatedReadSessionSettings& settings) {
+ return Impl_->CreateReadSession(settings);
}
-// std::shared_ptr<NTopic::ISimpleBlockingWriteSession> TFederatedTopicClient::CreateSimpleBlockingFederatedWriteSession(
+// std::shared_ptr<NTopic::ISimpleBlockingWriteSession> TFederatedTopicClient::CreateSimpleBlockingWriteSession(
// const TFederatedWriteSessionSettings& settings) {
-// return Impl_->CreateSimpleFederatedWriteSession(settings);
+// return Impl_->CreateSimpleBlockingWriteSession(settings);
// }
-// std::shared_ptr<NTopic::IWriteSession> TFederatedTopicClient::CreateFederatedWriteSession(const TFederatedWriteSessionSettings& settings) {
-// return Impl_->CreateFederatedWriteSession(settings);
-// }
+std::shared_ptr<NTopic::IWriteSession> TFederatedTopicClient::CreateWriteSession(const TFederatedWriteSessionSettings& settings) {
+ return Impl_->CreateWriteSession(settings);
+}
} // namespace NYdb::NFederatedTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp
index fa091252b5..2de842b5cd 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp
@@ -1,7 +1,7 @@
#include "federated_topic_impl.h"
#include "federated_read_session.h"
-// #include "federated_write_session.h"
+#include "federated_write_session.h"
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h>
@@ -9,13 +9,36 @@
namespace NYdb::NFederatedTopic {
std::shared_ptr<IFederatedReadSession>
-TFederatedTopicClient::TImpl::CreateFederatedReadSession(const TFederatedReadSessionSettings& settings) {
+TFederatedTopicClient::TImpl::CreateReadSession(const TFederatedReadSessionSettings& settings) {
InitObserver();
auto session = std::make_shared<TFederatedReadSession>(settings, Connections, ClientSettings, GetObserver());
session->Start();
return std::move(session);
}
+// std::shared_ptr<NTopic::ISimpleBlockingWriteSession>
+// TFederatedTopicClient::TImpl::CreateSimpleBlockingWriteSession(const TFederatedWriteSessionSettings& settings) {
+// InitObserver();
+// auto session = std::make_shared<TSimpleBlockingFederatedWriteSession>(settings, Connections, ClientSettings, GetObserver());
+// session->Start();
+// return std::move(session);
+
+// }
+
+std::shared_ptr<NTopic::IWriteSession>
+TFederatedTopicClient::TImpl::CreateWriteSession(const TFederatedWriteSessionSettings& settings) {
+ // Split settings.MaxMemoryUsage_ by two.
+ // One half goes to subsession. Other half goes to federated session internal buffer.
+ const ui64 splitSize = (settings.MaxMemoryUsage_ + 1) / 2;
+ TFederatedWriteSessionSettings splitSettings = settings;
+ splitSettings.MaxMemoryUsage(splitSize);
+ InitObserver();
+ auto session = std::make_shared<TFederatedWriteSession>(splitSettings, Connections, ClientSettings, GetObserver());
+ session->Start();
+ return std::move(session);
+
+}
+
void TFederatedTopicClient::TImpl::InitObserver() {
with_lock(Lock) {
if (!Observer || Observer->IsStale()) {
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h
index c963b3a94a..cf807776ad 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h
@@ -33,7 +33,10 @@ public:
}
// Runtime API.
- std::shared_ptr<IFederatedReadSession> CreateFederatedReadSession(const TFederatedReadSessionSettings& settings);
+ std::shared_ptr<IFederatedReadSession> CreateReadSession(const TFederatedReadSessionSettings& settings);
+
+ std::shared_ptr<NTopic::ISimpleBlockingWriteSession> CreateSimpleBlockingWriteSession(const TFederatedWriteSessionSettings& settings);
+ std::shared_ptr<NTopic::IWriteSession> CreateWriteSession(const TFederatedWriteSessionSettings& settings);
std::shared_ptr<TFederatedDbObserver> GetObserver() {
with_lock(Lock) {
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp
new file mode 100644
index 0000000000..d011480b95
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp
@@ -0,0 +1,294 @@
+#include "federated_write_session.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h>
+
+#include <library/cpp/threading/future/future.h>
+
+#include <algorithm>
+
+namespace NYdb::NFederatedTopic {
+
+constexpr TDuration UPDATE_FEDERATION_STATE_DELAY = TDuration::Seconds(10);
+
+bool DatabasesAreSame(std::shared_ptr<TDbInfo> lhs, std::shared_ptr<TDbInfo> rhs) {
+ if (!lhs || !rhs) {
+ return false;
+ }
+ return lhs->path() == rhs->path() && lhs->endpoint() == rhs->endpoint();
+}
+
+NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings& settings);
+
+TFederatedWriteSession::TFederatedWriteSession(const TFederatedWriteSessionSettings& settings,
+ std::shared_ptr<TGRpcConnectionsImpl> connections,
+ const TFederatedTopicClientSettings& clientSetttings,
+ std::shared_ptr<TFederatedDbObserver> observer)
+ : Settings(settings)
+ , Connections(std::move(connections))
+ , SubClientSetttings(FromFederated(clientSetttings))
+ , Observer(std::move(observer))
+ , AsyncInit(Observer->WaitForFirstState())
+ , FederationState(nullptr)
+ , ClientEventsQueue(std::make_shared<NTopic::TWriteSessionEventsQueue>(Settings))
+ , BufferFreeSpace(Settings.MaxMemoryUsage_)
+{
+}
+
+void TFederatedWriteSession::Start() {
+ // TODO validate settings?
+ with_lock(Lock) {
+ ClientEventsQueue->PushEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
+ ClientHasToken = true;
+ }
+
+ AsyncInit.Subscribe([self = shared_from_this()](const auto& f){
+ Y_UNUSED(f);
+ with_lock(self->Lock) {
+ self->FederationState = self->Observer->GetState();
+ self->OnFederatedStateUpdateImpl();
+ }
+ });
+}
+
+void TFederatedWriteSession::OpenSubSessionImpl(std::shared_ptr<TDbInfo> db) {
+ if (Subsession) {
+ PendingToken.Clear();
+ Subsession->Close(TDuration::Zero());
+ }
+ NTopic::TTopicClientSettings clientSettings = SubClientSetttings;
+ clientSettings
+ .Database(db->path())
+ .DiscoveryEndpoint(db->endpoint());
+ auto subclient = make_shared<NTopic::TTopicClient::TImpl>(Connections, clientSettings);
+
+ auto handlers = NTopic::TWriteSessionSettings::TEventHandlers()
+ .HandlersExecutor(Settings.EventHandlers_.HandlersExecutor_)
+ .ReadyToAcceptHander([self = shared_from_this()](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& ev){
+ TDeferredWrite deferred(self->Subsession);
+ with_lock(self->Lock) {
+ Y_ABORT_UNLESS(self->PendingToken.Empty());
+ self->PendingToken = std::move(ev.ContinuationToken);
+ self->PrepareDeferredWrite(deferred);
+ }
+ deferred.DoWrite();
+ })
+ .AcksHandler([self = shared_from_this()](NTopic::TWriteSessionEvent::TAcksEvent& ev){
+ with_lock(self->Lock) {
+ Y_ABORT_UNLESS(ev.Acks.size() <= self->OriginalMessagesToGetAck.size());
+ for (size_t i = 0; i < ev.Acks.size(); ++i) {
+ self->BufferFreeSpace += self->OriginalMessagesToGetAck.front().Data.size();
+ self->OriginalMessagesToGetAck.pop_front();
+ }
+ self->ClientEventsQueue->PushEvent(std::move(ev));
+ if (self->BufferFreeSpace > 0 && !self->ClientHasToken) {
+ self->ClientEventsQueue->PushEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
+ self->ClientHasToken = true;
+ }
+ }
+ })
+ .SessionClosedHandler([self = shared_from_this()](const NTopic::TSessionClosedEvent & ev){
+ with_lock(self->Lock) {
+ self->ClientEventsQueue->PushEvent(ev);
+ }
+ });
+
+ NTopic::TWriteSessionSettings wsSettings = Settings;
+ wsSettings
+ // .MaxMemoryUsage(Settings.MaxMemoryUsage_) // to fix if split not by half on creation
+ .EventHandlers(handlers);
+
+ Subsession = subclient->CreateWriteSession(wsSettings);
+ CurrentDatabase = db;
+}
+
+std::shared_ptr<TDbInfo> TFederatedWriteSession::SelectDatabaseImpl() {
+ std::vector<std::shared_ptr<TDbInfo>> availableDatabases;
+ ui64 totalWeight = 0;
+
+ for (const auto& db : FederationState->DbInfos) {
+ if (db->status() != TDbInfo::Status::DatabaseInfo_Status_AVAILABLE) {
+ continue;
+ }
+
+ if (Settings.PreferredDatabase_ && (AsciiEqualsIgnoreCase(db->name(), *Settings.PreferredDatabase_) ||
+ AsciiEqualsIgnoreCase(db->id(), *Settings.PreferredDatabase_))) {
+ return db;
+ } else if (AsciiEqualsIgnoreCase(FederationState->SelfLocation, db->location())) {
+ return db;
+ } else {
+ availableDatabases.push_back(db);
+ totalWeight += db->weight();
+ }
+ }
+
+ if (availableDatabases.empty() || totalWeight == 0) {
+ // close session, return error
+ return nullptr;
+ }
+
+ std::sort(availableDatabases.begin(), availableDatabases.end(), [](const std::shared_ptr<TDbInfo>& lhs, const std::shared_ptr<TDbInfo>& rhs){
+ return lhs->weight() > rhs->weight()
+ || lhs->weight() == rhs->weight() && lhs->name() < rhs->name();
+ });
+
+ ui64 hashValue = THash<TString>()(Settings.Path_);
+ hashValue = CombineHashes(hashValue, THash<TString>()(Settings.ProducerId_));
+ hashValue %= totalWeight;
+
+ ui64 borderWeight = 0;
+ for (const auto& db : availableDatabases) {
+ borderWeight += db->weight();
+ if (hashValue < borderWeight) {
+ return db;
+ }
+ }
+ Y_UNREACHABLE();
+}
+
+void TFederatedWriteSession::OnFederatedStateUpdateImpl() {
+ if (!FederationState->Status.IsSuccess()) {
+ CloseImpl(FederationState->Status.GetStatus(), NYql::TIssues{});
+ return;
+ }
+
+ Y_ABORT_UNLESS(!FederationState->DbInfos.empty());
+
+ auto preferrableDb = SelectDatabaseImpl();
+
+ if (!preferrableDb) {
+ CloseImpl(EStatus::UNAVAILABLE,
+ NYql::TIssues{NYql::TIssue("Fail to select database: no available database with positive weight")});
+ return;
+ }
+
+ if (!DatabasesAreSame(preferrableDb, CurrentDatabase)) {
+ OpenSubSessionImpl(preferrableDb);
+ }
+
+ ScheduleFederatedStateUpdateImpl(UPDATE_FEDERATION_STATE_DELAY);
+}
+
+void TFederatedWriteSession::ScheduleFederatedStateUpdateImpl(TDuration delay) {
+ Y_ABORT_UNLESS(Lock.IsLocked());
+ auto cb = [self = shared_from_this()](bool ok) {
+ if (ok) {
+ with_lock(self->Lock) {
+ if (self->Closing) {
+ return;
+ }
+ self->FederationState = self->Observer->GetState();
+ self->OnFederatedStateUpdateImpl();
+ }
+ }
+ };
+
+ UpdateStateDelayContext = Connections->CreateContext();
+ if (!UpdateStateDelayContext) {
+ Closing = true;
+ // TODO log DRIVER_IS_STOPPING_DESCRIPTION
+ return;
+ }
+ Connections->ScheduleCallback(delay,
+ std::move(cb),
+ UpdateStateDelayContext);
+}
+
+NThreading::TFuture<void> TFederatedWriteSession::WaitEvent() {
+ return ClientEventsQueue->WaitEvent();
+}
+
+TVector<NTopic::TWriteSessionEvent::TEvent> TFederatedWriteSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount) {
+ return ClientEventsQueue->GetEvents(block, maxEventsCount);
+}
+
+TMaybe<NTopic::TWriteSessionEvent::TEvent> TFederatedWriteSession::GetEvent(bool block) {
+ auto events = GetEvents(block, 1);
+ return events.empty() ? Nothing() : TMaybe<NTopic::TWriteSessionEvent::TEvent>{std::move(events.front())};
+}
+
+NThreading::TFuture<ui64> TFederatedWriteSession::GetInitSeqNo() {
+ return NThreading::MakeFuture<ui64>(0u);
+}
+
+void TFederatedWriteSession::Write(NTopic::TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo,
+ TMaybe<TInstant> createTimestamp) {
+ NTopic::TWriteMessage message{std::move(data)};
+ if (seqNo.Defined())
+ message.SeqNo(*seqNo);
+ if (createTimestamp.Defined())
+ message.CreateTimestamp(*createTimestamp);
+ return WriteInternal(std::move(token), std::move(message));
+}
+
+void TFederatedWriteSession::Write(NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) {
+ return WriteInternal(std::move(token), std::move(message));
+}
+
+void TFederatedWriteSession::WriteEncoded(NTopic::TContinuationToken&& token, TStringBuf data, NTopic::ECodec codec,
+ ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp) {
+ auto message = NTopic::TWriteMessage::CompressedMessage(std::move(data), codec, originalSize);
+ if (seqNo.Defined())
+ message.SeqNo(*seqNo);
+ if (createTimestamp.Defined())
+ message.CreateTimestamp(*createTimestamp);
+ return WriteInternal(std::move(token), std::move(message));
+}
+
+void TFederatedWriteSession::WriteEncoded(NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) {
+ return WriteInternal(std::move(token), std::move(message));
+}
+
+void TFederatedWriteSession::WriteInternal(NTopic::TContinuationToken&&, NTopic::TWriteMessage&& message) {
+ ClientHasToken = false;
+ if (!message.CreateTimestamp_.Defined()) {
+ message.CreateTimestamp_ = TInstant::Now();
+ }
+
+ {
+ TDeferredWrite deferred(Subsession);
+ with_lock(Lock) {
+ BufferFreeSpace -= message.Data.size();
+ OriginalMessagesToPassDown.emplace_back(std::move(message));
+
+ PrepareDeferredWrite(deferred);
+ }
+ deferred.DoWrite();
+ }
+ if (BufferFreeSpace > 0) {
+ ClientEventsQueue->PushEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
+ ClientHasToken = true;
+ }
+}
+
+bool TFederatedWriteSession::PrepareDeferredWrite(TDeferredWrite& deferred) {
+ if (PendingToken.Empty()) {
+ return false;
+ }
+ if (OriginalMessagesToPassDown.empty()) {
+ return false;
+ }
+ OriginalMessagesToGetAck.push_back(OriginalMessagesToPassDown.front());
+ deferred.Token.ConstructInPlace(std::move(*PendingToken));
+ deferred.Message.ConstructInPlace(std::move(OriginalMessagesToPassDown.front()));
+ OriginalMessagesToPassDown.pop_front();
+ PendingToken.Clear();
+ return true;
+}
+
+void TFederatedWriteSession::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) {
+ Closing = true;
+ if (Subsession) {
+ Subsession->Close(TDuration::Zero());
+ }
+ ClientEventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues)));
+}
+
+bool TFederatedWriteSession::Close(TDuration timeout) {
+ if (Subsession) {
+ return Subsession->Close(timeout);
+ }
+ return true;
+}
+
+} // namespace NYdb::NFederatedTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h
new file mode 100644
index 0000000000..7fd1020387
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h
@@ -0,0 +1,110 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h>
+
+#include <deque>
+
+namespace NYdb::NFederatedTopic {
+
+class TFederatedWriteSession : public NTopic::IWriteSession,
+ public NTopic::TContinuationTokenIssuer,
+ public std::enable_shared_from_this<TFederatedWriteSession> {
+ friend class TFederatedTopicClient::TImpl;
+
+public:
+ struct TDeferredWrite {
+ explicit TDeferredWrite(std::shared_ptr<NTopic::IWriteSession> writer)
+ : Writer(std::move(writer)) {
+ }
+
+ void DoWrite() {
+ if (Token.Empty() && Message.Empty()) {
+ return;
+ }
+ Y_ABORT_UNLESS(Token.Defined() && Message.Defined());
+ return Writer->Write(std::move(*Token), std::move(*Message));
+ }
+
+ std::shared_ptr<NTopic::IWriteSession> Writer;
+ TMaybe<NTopic::TContinuationToken> Token;
+ TMaybe<NTopic::TWriteMessage> Message;
+ };
+
+ TFederatedWriteSession(const TFederatedWriteSessionSettings& settings,
+ std::shared_ptr<TGRpcConnectionsImpl> connections,
+ const TFederatedTopicClientSettings& clientSetttings,
+ std::shared_ptr<TFederatedDbObserver> observer);
+
+ ~TFederatedWriteSession() = default;
+
+ NThreading::TFuture<void> WaitEvent() override;
+ TMaybe<NTopic::TWriteSessionEvent::TEvent> GetEvent(bool block) override;
+ TVector<NTopic::TWriteSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount) override;
+
+ virtual NThreading::TFuture<ui64> GetInitSeqNo() override;
+
+ virtual void Write(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& message) override;
+
+ virtual void WriteEncoded(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& params) override;
+
+ virtual void Write(NTopic::TContinuationToken&&, TStringBuf, TMaybe<ui64> seqNo = Nothing(),
+ TMaybe<TInstant> createTimestamp = Nothing()) override;
+
+ virtual void WriteEncoded(NTopic::TContinuationToken&&, TStringBuf, NTopic::ECodec, ui32,
+ TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override;
+
+ bool Close(TDuration timeout) override;
+
+ inline NTopic::TWriterCounters::TPtr GetCounters() override {Y_ABORT("Unimplemented"); } //ToDo - unimplemented;
+
+private:
+ void Start();
+ void OpenSubSessionImpl(std::shared_ptr<TDbInfo> db);
+
+ std::shared_ptr<TDbInfo> SelectDatabaseImpl();
+
+ void OnFederatedStateUpdateImpl();
+ void ScheduleFederatedStateUpdateImpl(TDuration delay);
+
+ void WriteInternal(NTopic::TContinuationToken&&, NTopic::TWriteMessage&& message);
+ bool PrepareDeferredWrite(TDeferredWrite& deferred);
+
+ void CloseImpl(EStatus statusCode, NYql::TIssues&& issues);
+
+private:
+ // For subsession creation
+ const NTopic::TFederatedWriteSessionSettings Settings;
+ std::shared_ptr<TGRpcConnectionsImpl> Connections;
+ const NTopic::TTopicClientSettings SubClientSetttings;
+
+ std::shared_ptr<TFederatedDbObserver> Observer;
+ NThreading::TFuture<void> AsyncInit;
+ std::shared_ptr<TFederatedDbState> FederationState;
+ NYdbGrpc::IQueueClientContextPtr UpdateStateDelayContext;
+
+ std::shared_ptr<TDbInfo> CurrentDatabase;
+
+ TString SessionId;
+ const TInstant StartSessionTime = TInstant::Now();
+
+ TAdaptiveLock Lock;
+
+ std::shared_ptr<NTopic::IWriteSession> Subsession;
+
+ std::shared_ptr<NTopic::TWriteSessionEventsQueue> ClientEventsQueue;
+
+ TMaybe<NTopic::TContinuationToken> PendingToken; // from Subsession
+ bool ClientHasToken = false;
+ std::deque<NTopic::TWriteMessage> OriginalMessagesToPassDown;
+ std::deque<NTopic::TWriteMessage> OriginalMessagesToGetAck;
+ i64 BufferFreeSpace;
+
+ // Exiting.
+ bool Closing = false;
+};
+
+} // namespace NYdb::NFederatedTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/ya.make b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/ya.make
index 8d10ac776e..5febb8f00f 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/ya.make
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/ya.make
@@ -4,6 +4,8 @@ SRCS(
federated_read_session.h
federated_read_session.cpp
federated_read_session_event.cpp
+ federated_write_session.h
+ federated_write_session.cpp
federated_topic_impl.h
federated_topic_impl.cpp
federated_topic.cpp
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
index 7b41ae11c7..a2780101c8 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
@@ -52,7 +52,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
.MaxMemoryUsageBytes(1_MB)
.AppendTopics(setup->GetTestTopic());
- ReadSession = topicClient.CreateFederatedReadSession(readSettings);
+ ReadSession = topicClient.CreateReadSession(readSettings);
Cerr << "Session was created" << Endl;
ReadSession->WaitEvent().Wait(TDuration::Seconds(1));
@@ -136,7 +136,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
.AppendTopics(setup->GetTestTopic());
Cerr << "Before ReadSession was created" << Endl;
- ReadSession = topicClient.CreateFederatedReadSession(readSettings);
+ ReadSession = topicClient.CreateReadSession(readSettings);
Cerr << "Session was created" << Endl;
auto f = ReadSession->WaitEvent();
@@ -230,7 +230,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
.MaxMemoryUsageBytes(1_MB)
.AppendTopics(setup->GetTestTopic());
- ReadSession = topicClient.CreateFederatedReadSession(readSettings);
+ ReadSession = topicClient.CreateReadSession(readSettings);
Cerr << "Session was created" << Endl;
Sleep(TDuration::MilliSeconds(50));
@@ -342,7 +342,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
.MaxMemoryUsageBytes(1_MB)
.AppendTopics(setup->GetTestTopic());
- ReadSession = topicClient.CreateFederatedReadSession(readSettings);
+ ReadSession = topicClient.CreateReadSession(readSettings);
Cerr << "Session was created" << Endl;
ReadSession->WaitEvent().Wait(TDuration::Seconds(1));
@@ -366,7 +366,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
Cerr << ">>> Got event: " << DebugString(*event) << Endl;
UNIT_ASSERT(std::holds_alternative<NTopic::TSessionClosedEvent>(*event));
- auto ReadSession2 = topicClient.CreateFederatedReadSession(readSettings);
+ auto ReadSession2 = topicClient.CreateReadSession(readSettings);
Cerr << "Session2 was created" << Endl;
ReadSession2->WaitEvent().Wait(TDuration::Seconds(1));
@@ -407,7 +407,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
.MaxMemoryUsageBytes(1_MB)
.AppendTopics(setup->GetTestTopic());
- ReadSession = topicClient.CreateFederatedReadSession(readSettings);
+ ReadSession = topicClient.CreateReadSession(readSettings);
Cerr << "Session was created" << Endl;
ReadSession->WaitEvent().Wait(TDuration::Seconds(1));
@@ -483,7 +483,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
checkedPromise.SetValue();
});
- ReadSession = topicClient.CreateFederatedReadSession(readSettings);
+ ReadSession = topicClient.CreateReadSession(readSettings);
Cerr << ">>> Session was created" << Endl;
Sleep(TDuration::MilliSeconds(50));
@@ -520,6 +520,73 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
AtomicSet(check, 0);
}
+ Y_UNIT_TEST(BasicWriteSession) {
+ auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(
+ TEST_CASE_NAME, false, ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::PRI_DEBUG, 2);
+
+ setup->Start(true, true);
+
+ TFederationDiscoveryServiceMock fdsMock;
+ fdsMock.Port = setup->GetGrpcPort();
+
+ ui16 newServicePort = setup->GetPortManager()->GetPort(4285);
+ auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock);
+
+ std::shared_ptr<NYdb::NTopic::IWriteSession> WriteSession;
+
+ // Create topic client.
+ NYdb::TDriverConfig cfg;
+ cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort);
+ cfg.SetDatabase("/Root");
+ cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG));
+ NYdb::TDriver driver(cfg);
+ NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver);
+
+ // Create write session.
+ auto writeSettings = NTopic::TWriteSessionSettings()
+ .Path(setup->GetTestTopic())
+ .MessageGroupId("src_id");
+
+ WriteSession = topicClient.CreateWriteSession(writeSettings);
+ Cerr << "Session was created" << Endl;
+
+ WriteSession->WaitEvent().Wait(TDuration::Seconds(1));
+ auto event = WriteSession->GetEvent(false);
+ Y_ASSERT(event);
+ Cerr << "Got new read session event: " << DebugString(*event) << Endl;
+ auto* readyToAcceptEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&*event);
+ Y_ASSERT(readyToAcceptEvent);
+ WriteSession->Write(std::move(readyToAcceptEvent->ContinuationToken), NTopic::TWriteMessage("hello"));
+
+ WriteSession->WaitEvent().Wait(TDuration::Seconds(1));
+ event = WriteSession->GetEvent(false);
+ Y_ASSERT(event);
+ Cerr << "Got new read session event: " << DebugString(*event) << Endl;
+
+ readyToAcceptEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&*event);
+ Y_ASSERT(readyToAcceptEvent);
+
+ std::optional<TFederationDiscoveryServiceMock::TManualRequest> fdsRequest;
+ do {
+ fdsRequest = fdsMock.GetNextPendingRequest();
+ if (!fdsRequest.has_value()) {
+ Sleep(TDuration::MilliSeconds(50));
+ }
+ } while (!fdsRequest.has_value());
+
+ fdsRequest->Result.SetValue(fdsMock.ComposeOkResult());
+
+ WriteSession->WaitEvent().Wait(TDuration::Seconds(1));
+ event = WriteSession->GetEvent(false);
+ Y_ASSERT(event);
+ Cerr << "Got new read session event: " << DebugString(*event) << Endl;
+
+ auto* acksEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TAcksEvent>(&*event);
+ Y_ASSERT(acksEvent);
+
+ WriteSession->Close(TDuration::MilliSeconds(10));
+ }
+
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
index 91962449f9..80e154517c 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
@@ -37,7 +37,7 @@ NThreading::TFuture<void> TWriteSession::WaitEvent() {
void TWriteSession::WriteEncoded(TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize,
TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp) {
- auto message = TWriteMessage::CompressedMessage(data, codec, originalSize);
+ auto message = TWriteMessage::CompressedMessage(std::move(data), codec, originalSize);
if (seqNo.Defined())
message.SeqNo(*seqNo);
if (createTimestamp.Defined())
@@ -52,7 +52,7 @@ void TWriteSession::WriteEncoded(TContinuationToken&& token, TWriteMessage&& mes
void TWriteSession::Write(TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo,
TMaybe<TInstant> createTimestamp) {
- TWriteMessage message{data};
+ TWriteMessage message{std::move(data)};
if (seqNo.Defined())
message.SeqNo(*seqNo);
if (createTimestamp.Defined())
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
index c89cdf1eca..1a45dd1ded 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
@@ -400,7 +400,6 @@ NThreading::TFuture<void> TWriteSessionImpl::WaitEvent() {
return EventsQueue->WaitEvent();
}
-// Client method.
void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& message) {
TInstant createdAtValue = message.CreateTimestamp_.Defined() ? *message.CreateTimestamp_ : TInstant::Now();
bool readyToAccept = false;
@@ -425,6 +424,7 @@ void TWriteSessionImpl::Write(TContinuationToken&& token, TWriteMessage&& messag
WriteInternal(std::move(token), std::move(message));
}
+// Client method.
void TWriteSessionImpl::WriteEncoded(TContinuationToken&& token, TWriteMessage&& message)
{
WriteInternal(std::move(token), std::move(message));
@@ -1245,8 +1245,9 @@ void TWriteSessionImpl::SendImpl() {
// Client method, no Lock
bool TWriteSessionImpl::Close(TDuration closeTimeout) {
- if (AtomicGet(Aborting))
+ if (AtomicGet(Aborting)) {
return false;
+ }
LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: close. Timeout " << closeTimeout);
auto startTime = TInstant::Now();
auto remaining = closeTimeout;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
index 9373e9c811..8c527ac843 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
@@ -103,6 +103,7 @@ public:
private:
struct THandlersVisitor : public TParent::TBaseHandlersVisitor {
using TParent::TBaseHandlersVisitor::TBaseHandlersVisitor;
+
#define DECLARE_HANDLER(type, handler, answer) \
bool operator()(type&) { \
if (this->PushHandler<type>( \
@@ -122,7 +123,6 @@ private:
bool Visit() {
return std::visit(*this, Event);
}
-
};
bool ApplyHandler(TEventInfo& eventInfo) {
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
index cdef92ad5c..309749aa1f 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
@@ -1236,7 +1236,6 @@ struct TWriteSessionEvent {
TMaybe<TWrittenMessageDetails> Details;
//! Write stats from server. See TWriteStat. nullptr for DISCARDED event.
TWriteStat::TPtr Stat;
-
};
struct TAcksEvent : public TPrintable<TAcksEvent> {
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
index 8b4dc03613..d0b5320578 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
@@ -143,18 +143,24 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
.Path(TEST_TOPIC)
.ProducerId(TEST_MESSAGE_GROUP_ID)
.MessageGroupId(TEST_MESSAGE_GROUP_ID);
+ Cerr << ">>> open write session " << i << Endl;
auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings);
UNIT_ASSERT(writeSession->Write("message_using_MessageGroupId"));
+ Cerr << ">>> write session " << i << " message written" << Endl;
writeSession->Close();
+ Cerr << ">>> write session " << i << " closed" << Endl;
}
{
auto writeSettings = TWriteSessionSettings()
.Path(TEST_TOPIC)
.ProducerId(TEST_MESSAGE_GROUP_ID)
.PartitionId(0);
+ Cerr << ">>> open write session 100" << Endl;
auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings);
UNIT_ASSERT(writeSession->Write("message_using_PartitionId"));
+ Cerr << ">>> write session 100 message written" << Endl;
writeSession->Close();
+ Cerr << ">>> write session 100 closed" << Endl;
}
{