aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2022-09-05 17:24:03 +0300
committerildar-khisam <ikhis@ydb.tech>2022-09-05 17:24:03 +0300
commitf1ee295b388568dbcc56001aafe25ca1711ee49e (patch)
tree5802f4ab2235c2a189471f4ffcf778ce64f65cf8
parent2a5400d4e3fc4b33e4217f4bff5ccdf452365d64 (diff)
downloadydb-f1ee295b388568dbcc56001aafe25ca1711ee49e.tar.gz
topic writes in sdk
better implement topic writes
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp18
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.cpp24
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.h1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/codecs/CMakeLists.txt22
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/codecs/codecs.cpp71
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/codecs/codecs.h16
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/executor.cpp5
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp48
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp1219
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h464
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/topic.h286
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt2
18 files changed, 2165 insertions, 42 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
index bc57fa2953c..649fc817e67 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
@@ -544,9 +544,9 @@ public:
, Session(std::move(parentSession))
, ErrorHandler(std::move(errorHandler))
{
- TAPartitionStream<false>::PartitionSessionId = static_cast<i64>(partitionStreamId);
+ TAPartitionStream<false>::PartitionSessionId = partitionStreamId;
TAPartitionStream<false>::TopicPath = std::move(topicPath);
- TAPartitionStream<false>::PartitionId = partitionId;
+ TAPartitionStream<false>::PartitionId = static_cast<ui64>(partitionId);
MaxCommittedOffset = static_cast<ui64>(readOffset);
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
index 4b606e03521..01a699a4129 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
@@ -22,10 +22,20 @@
#include <variant>
+// Forward delcarations
+
namespace NYdb::NTopic {
class TReadSession;
}
+namespace NYdb::NPersQueue::NCompressionDetails {
+ extern TString Decompress(const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& data);
+}
+
+namespace NYdb::NTopic::NCompressionDetails {
+ extern TString Decompress(const Ydb::Topic::StreamReadMessage::ReadResponse::MessageData& data, Ydb::Topic::Codec codec);
+}
+
namespace NYdb::NPersQueue {
static const bool RangesMode = !GetEnv("PQ_OFFSET_RANGES_MODE").empty();
@@ -2179,12 +2189,6 @@ TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::TDecompression
, Ready(ready) {
}
-// Forward delcaration
-namespace NCompressionDetails {
- extern TString Decompress(const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& data);
- extern TString Decompress(const Ydb::Topic::StreamReadMessage::ReadResponse::MessageData& data, Ydb::Topic::Codec codec);
-}
-
template<bool UseMigrationProtocol>
void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator()() {
i64 minOffset = Max<i64>();
@@ -2224,7 +2228,7 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator(
&& static_cast<Ydb::Topic::Codec>(batch.codec()) != Ydb::Topic::CODEC_RAW
&& static_cast<Ydb::Topic::Codec>(batch.codec()) != Ydb::Topic::CODEC_UNSPECIFIED
) {
- TString decompressed = NCompressionDetails::Decompress(data, static_cast<Ydb::Topic::Codec>(batch.codec()));
+ TString decompressed = ::NYdb::NTopic::NCompressionDetails::Decompress(data, static_cast<Ydb::Topic::Codec>(batch.codec()));
data.set_data(decompressed);
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.cpp
index 51c16f84f7a..bc2de2a1c50 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.cpp
@@ -25,21 +25,6 @@ IInputStream* CreateDecompressorStream(TInputStreamVariant& inputStreamStorage,
}
}
-IInputStream* CreateDecompressorStream(TInputStreamVariant& inputStreamStorage, Ydb::Topic::Codec codec, IInputStream* origin) {
- switch (codec) {
- case Ydb::Topic::CODEC_GZIP:
- return &inputStreamStorage.emplace<TZLibDecompress>(origin);
- case Ydb::Topic::CODEC_LZOP:
- throw yexception() << "LZO codec is disabled";
- case Ydb::Topic::CODEC_ZSTD:
- return &inputStreamStorage.emplace<TZstdDecompress>(origin);
- default:
- //case Ydb::Topic::CODEC_RAW:
- //case Ydb::Topic::CODEC_UNSPECIFIED:
- throw yexception() << "unsupported codec value : " << ui64(codec);
- }
-}
-
TString Decompress(const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& data) {
TMemoryInput input(data.data().data(), data.data().size());
TString result;
@@ -49,15 +34,6 @@ TString Decompress(const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage
return result;
}
-TString Decompress(const Ydb::Topic::StreamReadMessage::ReadResponse::MessageData& data, Ydb::Topic::Codec codec) {
- TMemoryInput input(data.data().data(), data.data().size());
- TString result;
- TStringOutput resultOutput(result);
- TInputStreamVariant inputStreamStorage;
- TransferData(CreateDecompressorStream(inputStreamStorage, codec, &input), &resultOutput);
- return result;
-}
-
class TZLibToStringCompressor: private TEmbedPolicy<TBufferOutput>, public TZLibCompress {
public:
TZLibToStringCompressor(TBuffer& dst, ZLib::StreamType type, size_t quality)
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.h b/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.h
index b18d14119e1..5b6b111c1e6 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.h
@@ -9,7 +9,6 @@ namespace NYdb::NPersQueue {
namespace NCompressionDetails {
extern TString Decompress(const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& data);
-extern TString Decompress(const Ydb::Topic::StreamReadMessage::ReadResponse::MessageData& data, Ydb::Topic::Codec codec);
THolder<IOutputStream> CreateCoder(ECodec codec, TBuffer& result, int quality);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt
index 6fc60fb0c0b..24274dcbeef 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(codecs)
add_subdirectory(impl)
add_subdirectory(ut)
@@ -14,11 +15,14 @@ target_link_libraries(cpp-client-ydb_topic PUBLIC
contrib-libs-cxxsupp
yutil
tools-enum_parser-enum_serialization_runtime
- client-ydb_persqueue_public-codecs
+ client-ydb_topic-codecs
library-cpp-retry
client-ydb_topic-impl
cpp-client-ydb_proto
cpp-client-ydb_driver
+ api-grpc
+ api-grpc-draft
+ api-protos
)
target_sources(cpp-client-ydb_topic PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/codecs/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_topic/codecs/CMakeLists.txt
new file mode 100644
index 00000000000..7bbe8147995
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/codecs/CMakeLists.txt
@@ -0,0 +1,22 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(client-ydb_topic-codecs)
+target_link_libraries(client-ydb_topic-codecs PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-streams-zstd
+ public-issue-protos
+ api-grpc-draft
+ api-grpc
+ api-protos
+)
+target_sources(client-ydb_topic-codecs PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/codecs/codecs.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/codecs/codecs.cpp b/ydb/public/sdk/cpp/client/ydb_topic/codecs/codecs.cpp
new file mode 100644
index 00000000000..44252130fda
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/codecs/codecs.cpp
@@ -0,0 +1,71 @@
+#include <library/cpp/streams/zstd/zstd.h>
+#include <util/stream/buffer.h>
+#include <util/stream/zlib.h>
+#include <util/stream/mem.h>
+
+#include "codecs.h"
+
+namespace NYdb::NTopic {
+namespace NCompressionDetails {
+
+using TInputStreamVariant = std::variant<std::monostate, TZLibDecompress, TZstdDecompress>;
+
+IInputStream* CreateDecompressorStream(TInputStreamVariant& inputStreamStorage, Ydb::Topic::Codec codec, IInputStream* origin) {
+ switch (codec) {
+ case Ydb::Topic::CODEC_GZIP:
+ return &inputStreamStorage.emplace<TZLibDecompress>(origin);
+ case Ydb::Topic::CODEC_LZOP:
+ throw yexception() << "LZO codec is disabled";
+ case Ydb::Topic::CODEC_ZSTD:
+ return &inputStreamStorage.emplace<TZstdDecompress>(origin);
+ default:
+ //case Ydb::Topic::CODEC_RAW:
+ //case Ydb::Topic::CODEC_UNSPECIFIED:
+ throw yexception() << "unsupported codec value : " << ui64(codec);
+ }
+}
+
+TString Decompress(const Ydb::Topic::StreamReadMessage::ReadResponse::MessageData& data, Ydb::Topic::Codec codec) {
+ TMemoryInput input(data.data().data(), data.data().size());
+ TString result;
+ TStringOutput resultOutput(result);
+ TInputStreamVariant inputStreamStorage;
+ TransferData(CreateDecompressorStream(inputStreamStorage, codec, &input), &resultOutput);
+ return result;
+}
+
+class TZLibToStringCompressor: private TEmbedPolicy<TBufferOutput>, public TZLibCompress {
+public:
+ TZLibToStringCompressor(TBuffer& dst, ZLib::StreamType type, size_t quality)
+ : TEmbedPolicy<TBufferOutput>(dst)
+ , TZLibCompress(TEmbedPolicy::Ptr(), type, quality)
+ {
+ }
+};
+
+class TZstdToStringCompressor: private TEmbedPolicy<TBufferOutput>, public TZstdCompress {
+public:
+ TZstdToStringCompressor(TBuffer& dst, int quality)
+ : TEmbedPolicy<TBufferOutput>(dst)
+ , TZstdCompress(TEmbedPolicy::Ptr(), quality)
+ {
+ }
+};
+
+THolder<IOutputStream> CreateCoder(ECodec codec, TBuffer& result, int quality) {
+ switch (codec) {
+ case ECodec::GZIP:
+ return MakeHolder<TZLibToStringCompressor>(result, ZLib::GZip, quality >= 0 ? quality : 6);
+ case ECodec::LZOP:
+ throw yexception() << "LZO codec is disabled";
+ case ECodec::ZSTD:
+ return MakeHolder<TZstdToStringCompressor>(result, quality);
+ default:
+ Y_FAIL("NOT IMPLEMENTED CODEC TYPE");
+ }
+}
+
+
+} // namespace NDecompressionDetails
+
+} // namespace NYdb::NTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/codecs/codecs.h b/ydb/public/sdk/cpp/client/ydb_topic/codecs/codecs.h
new file mode 100644
index 00000000000..79047730b89
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/codecs/codecs.h
@@ -0,0 +1,16 @@
+#pragma once
+#include <util/stream/output.h>
+#include <ydb/public/api/protos/ydb_topic.pb.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+
+
+namespace NYdb::NTopic {
+namespace NCompressionDetails {
+
+extern TString Decompress(const Ydb::Topic::StreamReadMessage::ReadResponse::MessageData& data, Ydb::Topic::Codec codec);
+
+THolder<IOutputStream> CreateCoder(ECodec codec, TBuffer& result, int quality);
+
+} // namespace NDecompressionDetails
+
+} // namespace NYdb::NTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt
index e138e1085f5..8e8d6bdb145 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt
@@ -13,9 +13,11 @@ target_link_libraries(client-ydb_topic-impl PUBLIC
yutil
cpp-grpc-client
cpp-monlib-dynamic_counters
+ cpp-monlib-metrics
cpp-string_utils-url
library-persqueue-obfuscate
api-grpc-draft
+ api-grpc
impl-ydb_internal-make_request
client-ydb_common_client-impl
cpp-client-ydb_driver
@@ -29,6 +31,7 @@ target_sources(client-ydb_topic-impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/deferred_commit.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/event_handlers.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
)
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/executor.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/executor.cpp
index 368df898ac6..18bfccf8a49 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/executor.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/executor.cpp
@@ -88,4 +88,9 @@ TThreadPoolExecutor::TThreadPoolExecutor(size_t threadsCount)
ThreadsCount = threadsCount;
}
+IExecutor::TPtr CreateSyncExecutor()
+{
+ return MakeIntrusive<TSyncExecutor>();
+}
+
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
index c82668f104d..f6abfca8226 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
@@ -234,4 +234,12 @@ std::shared_ptr<IReadSession> TTopicClient::CreateReadSession(const TReadSession
return Impl_->CreateReadSession(settings);
}
+std::shared_ptr<ISimpleBlockingWriteSession> TTopicClient::CreateSimpleBlockingWriteSession(const TWriteSessionSettings& settings) {
+ return Impl_->CreateSimpleWriteSession(settings);
+}
+
+std::shared_ptr<IWriteSession> TTopicClient::CreateWriteSession(const TWriteSessionSettings& settings) {
+ return Impl_->CreateWriteSession(settings);
+}
+
} // namespace NYdb::NTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp
index b18aab25301..7233e9b945b 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp
@@ -4,6 +4,7 @@
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h>
#include "read_session.h"
+#include "write_session.h"
namespace NYdb::NTopic {
@@ -23,7 +24,45 @@ std::shared_ptr<IReadSession> TTopicClient::TImpl::CreateReadSession(const TRead
auto session = std::make_shared<TReadSession>(maybeSettings.GetOrElse(settings), shared_from_this(), Connections_, DbDriverState_);
session->Start();
return std::move(session);
- // return std::make_shared<TDummyReadSession>(settings);
+}
+
+std::shared_ptr<IWriteSession> TTopicClient::TImpl::CreateWriteSession(
+ const TWriteSessionSettings& settings
+) {
+ TMaybe<TWriteSessionSettings> maybeSettings;
+ if (!settings.CompressionExecutor_ || !settings.EventHandlers_.HandlersExecutor_) {
+ maybeSettings = settings;
+ with_lock (Lock) {
+ if (!settings.CompressionExecutor_) {
+ maybeSettings->CompressionExecutor(Settings.DefaultCompressionExecutor_);
+ }
+ if (!settings.EventHandlers_.HandlersExecutor_) {
+ maybeSettings->EventHandlers_.HandlersExecutor(Settings.DefaultHandlersExecutor_);
+ }
+ }
+ }
+ auto session = std::make_shared<TWriteSession>(
+ maybeSettings.GetOrElse(settings), shared_from_this(), Connections_, DbDriverState_
+ );
+ session->Start(TDuration::Zero());
+ return std::move(session);
+}
+
+std::shared_ptr<ISimpleBlockingWriteSession> TTopicClient::TImpl::CreateSimpleWriteSession(
+ const TWriteSessionSettings& settings
+) {
+ auto alteredSettings = settings;
+ with_lock (Lock) {
+ alteredSettings.EventHandlers_.HandlersExecutor(Settings.DefaultHandlersExecutor_);
+ if (!settings.CompressionExecutor_) {
+ alteredSettings.CompressionExecutor(Settings.DefaultCompressionExecutor_);
+ }
+ }
+
+ auto session = std::make_shared<TSimpleBlockingWriteSession>(
+ alteredSettings, shared_from_this(), Connections_, DbDriverState_
+ );
+ return std::move(session);
}
std::shared_ptr<TTopicClient::TImpl::IReadSessionConnectionProcessorFactory> TTopicClient::TImpl::CreateReadSessionConnectionProcessorFactory() {
@@ -33,4 +72,11 @@ std::shared_ptr<TTopicClient::TImpl::IReadSessionConnectionProcessorFactory> TTo
return NPersQueue::CreateConnectionProcessorFactory<TService, TRequest, TResponse>(&TService::Stub::AsyncStreamRead, Connections_, DbDriverState_);
}
+std::shared_ptr<TTopicClient::TImpl::IWriteSessionConnectionProcessorFactory> TTopicClient::TImpl::CreateWriteSessionConnectionProcessorFactory() {
+ using TService = Ydb::Topic::V1::TopicService;
+ using TRequest = Ydb::Topic::StreamWriteMessage::FromClient;
+ using TResponse = Ydb::Topic::StreamWriteMessage::FromServer;
+ return NPersQueue::CreateConnectionProcessorFactory<TService, TRequest, TResponse>(&TService::Stub::AsyncStreamWrite, Connections_, DbDriverState_);
+}
+
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
index 15589ab1601..f24800c9331 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
@@ -201,6 +201,8 @@ public:
// Runtime API.
std::shared_ptr<IReadSession> CreateReadSession(const TReadSessionSettings& settings);
+ std::shared_ptr<ISimpleBlockingWriteSession> CreateSimpleWriteSession(const TWriteSessionSettings& settings);
+ std::shared_ptr<IWriteSession> CreateWriteSession(const TWriteSessionSettings& settings);
using IReadSessionConnectionProcessorFactory =
NYdb::NPersQueue::ISessionConnectionProcessorFactory<Ydb::Topic::StreamReadMessage::FromClient,
@@ -208,6 +210,12 @@ public:
std::shared_ptr<IReadSessionConnectionProcessorFactory> CreateReadSessionConnectionProcessorFactory();
+ using IWriteSessionConnectionProcessorFactory =
+ NYdb::NPersQueue::ISessionConnectionProcessorFactory<Ydb::Topic::StreamWriteMessage::FromClient,
+ Ydb::Topic::StreamWriteMessage::FromServer>;
+
+ std::shared_ptr<IWriteSessionConnectionProcessorFactory> CreateWriteSessionConnectionProcessorFactory();
+
NGrpc::IQueueClientContextPtr CreateContext() {
return Connections_->CreateContext();
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
new file mode 100644
index 00000000000..83103a1ec98
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
@@ -0,0 +1,1219 @@
+#include "write_session.h"
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+#include <library/cpp/string_utils/url/url.h>
+
+#include <google/protobuf/util/time_util.h>
+
+#include <util/generic/store_policy.h>
+#include <util/generic/utility.h>
+#include <util/stream/buffer.h>
+
+
+namespace NYdb::NTopic {
+using ::NMonitoring::TDynamicCounterPtr;
+using TCounterPtr = ::NMonitoring::TDynamicCounters::TCounterPtr;
+
+
+const TDuration UPDATE_TOKEN_PERIOD = TDuration::Hours(1);
+
+namespace NCompressionDetails {
+ THolder<IOutputStream> CreateCoder(ECodec codec, TBuffer& result, int quality);
+}
+
+#define HISTOGRAM_SETUP ::NMonitoring::ExplicitHistogram({0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100})
+TWriterCounters::TWriterCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters) {
+ Errors = counters->GetCounter("errors", true);
+ CurrentSessionLifetimeMs = counters->GetCounter("currentSessionLifetimeMs", false);
+ BytesWritten = counters->GetCounter("bytesWritten", true);
+ MessagesWritten = counters->GetCounter("messagesWritten", true);
+ BytesWrittenCompressed = counters->GetCounter("bytesWrittenCompressed", true);
+ BytesInflightUncompressed = counters->GetCounter("bytesInflightUncompressed", false);
+ BytesInflightCompressed = counters->GetCounter("bytesInflightCompressed", false);
+ BytesInflightTotal = counters->GetCounter("bytesInflightTotal", false);
+ MessagesInflight = counters->GetCounter("messagesInflight", false);
+
+ TotalBytesInflightUsageByTime = counters->GetHistogram("totalBytesInflightUsageByTime", HISTOGRAM_SETUP);
+ UncompressedBytesInflightUsageByTime = counters->GetHistogram("uncompressedBytesInflightUsageByTime", HISTOGRAM_SETUP);
+ CompressedBytesInflightUsageByTime = counters->GetHistogram("compressedBytesInflightUsageByTime", HISTOGRAM_SETUP);
+}
+#undef HISTOGRAM_SETUP
+
+TWriteSession::TWriteSession(
+ const TWriteSessionSettings& settings,
+ std::shared_ptr<TTopicClient::TImpl> client,
+ std::shared_ptr<TGRpcConnectionsImpl> connections,
+ TDbDriverStatePtr dbDriverState)
+ : Settings(settings)
+ , Client(std::move(client))
+ , Connections(std::move(connections))
+ , DbDriverState(std::move(dbDriverState))
+ , PrevToken(DbDriverState->CredentialsProvider ? DbDriverState->CredentialsProvider->GetAuthInfo() : "")
+ , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings))
+ , InitSeqNoPromise(NThreading::NewPromise<ui64>())
+ , WakeupInterval(
+ Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) ?
+ std::min(Settings.BatchFlushInterval_.GetOrElse(TDuration::Seconds(1)) / 5, TDuration::MilliSeconds(100))
+ :
+ TDuration::MilliSeconds(100)
+ )
+{
+ if (!Settings.RetryPolicy_) {
+ Settings.RetryPolicy_ = IRetryPolicy::GetDefaultPolicy();
+ }
+ if (Settings.Counters_.Defined()) {
+ Counters = *Settings.Counters_;
+ } else {
+ Counters = MakeIntrusive<TWriterCounters>(new ::NMonitoring::TDynamicCounters());
+ }
+
+}
+
+void TWriteSession::Start(const TDuration& delay) {
+ ++ConnectionAttemptsDone;
+ if (!Started) {
+ HandleWakeUpImpl();
+ InitWriter();
+ }
+ Started = true;
+
+ DoConnect(delay, DbDriverState->DiscoveryEndpoint);
+}
+
+// Only called under lock
+TWriteSession::THandleResult TWriteSession::RestartImpl(const TPlainStatus& status) {
+ THandleResult result;
+ if (AtomicGet(Aborting)) {
+ DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart");
+ return result;
+ }
+ DbDriverState->Log.Write(
+ TLOG_INFO,
+ LogPrefix() << "Got error. Status: " << status.Status
+ << ". Description: " << NPersQueue::IssuesSingleLineString(status.Issues)
+ );
+ SessionEstablished = false;
+ TMaybe<TDuration> nextDelay = TDuration::Zero();
+ if (!RetryState) {
+ RetryState = Settings.RetryPolicy_->CreateRetryState();
+ }
+ nextDelay = RetryState->GetNextRetryDelay(status.Status);
+
+ if (nextDelay) {
+ result.StartDelay = *nextDelay;
+ result.DoRestart = true;
+ DbDriverState->Log.Write(
+ TLOG_DEBUG,
+ LogPrefix() << "Write session will restart in " << result.StartDelay.MilliSeconds() << " ms"
+ );
+ ResetForRetryImpl();
+
+ } else {
+ DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error");
+ result.DoStop = true;
+ CheckHandleResultImpl(result);
+ }
+ return result;
+}
+
+void TWriteSession::InitWriter() { // No Lock, very initial start - no race yet as well.
+ CompressionExecutor = Settings.CompressionExecutor_;
+ IExecutor::TPtr executor;
+ executor = CreateSyncExecutor();
+ executor->Start();
+ Executor = std::move(executor);
+
+ Settings.CompressionExecutor_->Start();
+ Settings.EventHandlers_.HandlersExecutor_->Start();
+
+}
+// Client method
+NThreading::TFuture<ui64> TWriteSession::GetInitSeqNo() {
+ if (Settings.ValidateSeqNo_) {
+ if (AutoSeqNoMode.Defined() && *AutoSeqNoMode) {
+ DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode");
+ ThrowFatalError("Cannot call GetInitSeqNo in Auto SeqNo mode");
+ }
+ else
+ AutoSeqNoMode = false;
+ }
+ return InitSeqNoPromise.GetFuture();
+}
+
+TString DebugString(const TWriteSessionEvent::TEvent& event) {
+ return std::visit([](const auto& ev) { return ev.DebugString(); }, event);
+}
+
+// Client method
+TMaybe<TWriteSessionEvent::TEvent> TWriteSession::GetEvent(bool block) {
+ return EventsQueue->GetEvent(block);
+}
+
+// Client method
+TVector<TWriteSessionEvent::TEvent> TWriteSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount) {
+ return EventsQueue->GetEvents(block, maxEventsCount);
+}
+
+// Only called under lock
+ui64 TWriteSession::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) {
+ ui64 seqNoValue = LastSeqNo + 1;
+ if (!AutoSeqNoMode.Defined()) {
+ AutoSeqNoMode = !seqNo.Defined();
+ //! Disable SeqNo shift for manual SeqNo mode;
+ if (seqNo.Defined()) {
+ OnSeqNoShift = false;
+ SeqNoShift = 0;
+ }
+ }
+ if (seqNo.Defined()) {
+ if (*AutoSeqNoMode) {
+ DbDriverState->Log.Write(
+ TLOG_ERR,
+ LogPrefix() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode"
+ );
+ ThrowFatalError(
+ "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode"
+ );
+ } else {
+ seqNoValue = *seqNo;
+ }
+ } else if (!(*AutoSeqNoMode)) {
+ DbDriverState->Log.Write(
+ TLOG_ERR,
+ LogPrefix() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode"
+ );
+ ThrowFatalError(
+ "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode"
+ );
+ }
+ LastSeqNo = seqNoValue;
+ return seqNoValue;
+}
+inline void TWriteSession::CheckHandleResultImpl(THandleResult& result) {
+ result.DoSetSeqNo = result.DoStop && !InitSeqNoSetDone && (InitSeqNoSetDone = true);
+}
+
+void TWriteSession::ProcessHandleResult(THandleResult& result) {
+ if (result.DoRestart) {
+ Start(result.StartDelay);
+ } else if (result.DoSetSeqNo) {
+ InitSeqNoPromise.SetException("session closed");
+ }
+}
+
+NThreading::TFuture<void> TWriteSession::WaitEvent() {
+ return EventsQueue->WaitEvent();
+}
+
+// Client method.
+void TWriteSession::WriteInternal(
+ TContinuationToken&&, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp
+ ) {
+ TInstant createdAtValue = createTimestamp.Defined() ? *createTimestamp : TInstant::Now();
+ bool readyToAccept = false;
+ size_t bufferSize = data.size();
+ with_lock(Lock) {
+ CurrentBatch.Add(GetNextSeqNoImpl(seqNo), createdAtValue, data, codec, originalSize);
+
+ FlushWriteIfRequiredImpl();
+ readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk;
+ }
+ if (readyToAccept) {
+ EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ }
+}
+
+// Client method.
+void TWriteSession::WriteEncoded(
+ TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp
+ ) {
+ WriteInternal(std::move(token), data, codec, originalSize, seqNo, createTimestamp);
+}
+
+void TWriteSession::Write(
+ TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp
+ ) {
+ WriteInternal(std::move(token), data, {}, 0, seqNo, createTimestamp);
+}
+
+
+// Only called under lock.
+TWriteSession::THandleResult TWriteSession::OnErrorImpl(NYdb::TPlainStatus&& status) {
+ (*Counters->Errors)++;
+ auto result = RestartImpl(status);
+ if (result.DoStop) {
+ CloseImpl(status.Status, std::move(status.Issues));
+ }
+ return result;
+}
+
+// No lock
+void TWriteSession::DoConnect(const TDuration& delay, const TString& endpoint) {
+ DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint);
+
+ NGrpc::IQueueClientContextPtr prevConnectContext;
+ NGrpc::IQueueClientContextPtr prevConnectTimeoutContext;
+ NGrpc::IQueueClientContextPtr prevConnectDelayContext;
+ NGrpc::IQueueClientContextPtr connectContext = nullptr;
+ NGrpc::IQueueClientContextPtr connectDelayContext = nullptr;
+ NGrpc::IQueueClientContextPtr connectTimeoutContext = nullptr;
+ TRpcRequestSettings reqSettings;
+ std::shared_ptr<IWriteSessionConnectionProcessorFactory> connectionFactory;
+ with_lock(Lock) {
+ ++ConnectionGeneration;
+ auto subclient = Client;
+ connectionFactory = subclient->CreateWriteSessionConnectionProcessorFactory();
+ auto clientContext = subclient->CreateContext();
+ ConnectionFactory = connectionFactory;
+
+ ClientContext = std::move(clientContext);
+ ServerMessage = std::make_shared<TServerMessage>();
+
+ if (!ClientContext) {
+ AbortImpl();
+ // Grpc and WriteSession is closing right now.
+ return;
+ }
+
+ connectContext = ClientContext->CreateContext();
+ if (delay)
+ connectDelayContext = ClientContext->CreateContext();
+ connectTimeoutContext = ClientContext->CreateContext();
+
+ // Previous operations contexts.
+
+ // Set new context
+ prevConnectContext = std::exchange(ConnectContext, connectContext);
+ prevConnectTimeoutContext = std::exchange(ConnectTimeoutContext, connectTimeoutContext);
+ prevConnectDelayContext = std::exchange(ConnectDelayContext, connectDelayContext);
+ Y_ASSERT(ConnectContext);
+ Y_ASSERT(ConnectTimeoutContext);
+
+ // Cancel previous operations.
+ NPersQueue::Cancel(prevConnectContext);
+ if (prevConnectDelayContext)
+ NPersQueue::Cancel(prevConnectDelayContext);
+ NPersQueue::Cancel(prevConnectTimeoutContext);
+ Y_ASSERT(connectContext);
+ Y_ASSERT(connectTimeoutContext);
+ reqSettings = TRpcRequestSettings::Make(Settings);
+ }
+ auto connectCallback = [weakThis = weak_from_this(), connectContext = connectContext]
+ (TPlainStatus&& st, typename IProcessor::TPtr&& processor) {
+ if (auto sharedThis = weakThis.lock()) {
+ sharedThis->OnConnect(std::move(st), std::move(processor), connectContext);
+ }
+ };
+
+ auto connectTimeoutCallback = [weakThis = weak_from_this(), connectTimeoutContext = connectTimeoutContext]
+ (bool ok) {
+ if (ok) {
+ if (auto sharedThis = weakThis.lock()) {
+ sharedThis->OnConnectTimeout(connectTimeoutContext);
+ }
+ }
+ };
+
+ connectionFactory->CreateProcessor(
+ std::move(connectCallback),
+ reqSettings,
+ std::move(connectContext),
+ TDuration::Seconds(30) /* connect timeout */, // TODO: make connect timeout setting.
+ std::move(connectTimeoutContext),
+ std::move(connectTimeoutCallback),
+ delay,
+ std::move(connectDelayContext)
+ );
+}
+
+// RPC callback.
+void TWriteSession::OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext) {
+ DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Write session: connect timeout");
+ THandleResult handleResult;
+ with_lock (Lock) {
+ if (ConnectTimeoutContext == connectTimeoutContext) {
+ NPersQueue::Cancel(ConnectContext);
+ ConnectContext = nullptr;
+ ConnectTimeoutContext = nullptr;
+ ConnectDelayContext = nullptr;
+ } else {
+ return;
+ }
+ TStringBuilder description;
+ description << "Failed to establish connection to server. Attempts done: " << ConnectionAttemptsDone;
+ handleResult = RestartImpl(TPlainStatus(EStatus::TIMEOUT, description));
+ if (handleResult.DoStop) {
+ CloseImpl(
+ EStatus::TIMEOUT,
+ description
+ );
+ }
+ }
+ ProcessHandleResult(handleResult);
+}
+
+// RPC callback.
+void TWriteSession::OnConnect(
+ TPlainStatus&& st, typename IProcessor::TPtr&& processor, const NGrpc::IQueueClientContextPtr& connectContext
+) {
+ THandleResult handleResult;
+ with_lock (Lock) {
+ if (ConnectContext == connectContext) {
+ NPersQueue::Cancel(ConnectTimeoutContext);
+ ConnectContext = nullptr;
+ ConnectTimeoutContext = nullptr;
+ ConnectDelayContext = nullptr;
+
+ if (st.Ok()) {
+ Processor = std::move(processor);
+ InitImpl();
+ // Still should call ReadFromProcessor();
+ }
+ } else {
+ return;
+ }
+ if (!st.Ok()) {
+ handleResult = RestartImpl(st);
+ if (handleResult.DoStop) {
+ CloseImpl(
+ st.Status,
+ NPersQueue::MakeIssueWithSubIssues(
+ TStringBuilder() << "Failed to establish connection to server \"" << st.Endpoint
+ << "\". Attempts done: " << ConnectionAttemptsDone,
+ st.Issues
+ )
+ );
+ }
+ }
+ }
+ if (st.Ok())
+ ReadFromProcessor(); // Out of Init
+ ProcessHandleResult(handleResult);
+}
+
+// Produce init request for session.
+// Only called under lock.
+void TWriteSession::InitImpl() {
+ TClientMessage req;
+ auto* init = req.mutable_init_request();
+ init->set_path(Settings.Path_);
+ init->set_producer_id(Settings.ProducerId_);
+ init->set_message_group_id(Settings.MessageGroupId_);
+
+ for (const auto& attr : Settings.Meta_.Fields) {
+ (*init->mutable_write_session_meta())[attr.first] = attr.second;
+ }
+ DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString());
+ WriteToProcessorImpl(std::move(req));
+}
+
+// Called under lock. Invokes Processor->Write, which is assumed to be deadlock-safe
+void TWriteSession::WriteToProcessorImpl(TWriteSession::TClientMessage&& req) {
+ Y_ASSERT(Processor);
+ if (Aborting)
+ return;
+ auto callback = [weakThis = weak_from_this(), connectionGeneration = ConnectionGeneration](NGrpc::TGrpcStatus&& grpcStatus) {
+ if (auto sharedThis = weakThis.lock()) {
+ sharedThis->OnWriteDone(std::move(grpcStatus), connectionGeneration);
+ }
+ };
+
+ Processor->Write(std::move(req), callback);
+}
+
+void TWriteSession::ReadFromProcessor() {
+ Y_ASSERT(Processor);
+ IProcessor::TPtr prc;
+ ui64 generation;
+ with_lock(Lock) {
+ prc = Processor;
+ generation = ConnectionGeneration;
+ }
+ auto callback = [weakThis = weak_from_this(), connectionGeneration = generation, processor = prc, serverMessage = ServerMessage]
+ (NGrpc::TGrpcStatus&& grpcStatus) {
+ if (auto sharedThis = weakThis.lock()) {
+ sharedThis->OnReadDone(std::move(grpcStatus), connectionGeneration);
+ }
+ };
+ prc->Read(ServerMessage.get(), std::move(callback));
+}
+
+void TWriteSession::OnWriteDone(NGrpc::TGrpcStatus&& status, size_t connectionGeneration) {
+ THandleResult handleResult;
+ with_lock (Lock) {
+ if (connectionGeneration != ConnectionGeneration) {
+ return; // Message from previous connection. Ignore.
+ }
+ if (Aborting) {
+ return;
+ }
+ if(!status.Ok()) {
+ handleResult = OnErrorImpl(status);
+ }
+ }
+ ProcessHandleResult(handleResult);
+}
+
+void TWriteSession::OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration) {
+ TPlainStatus errorStatus;
+ TProcessSrvMessageResult processResult;
+ bool needSetValue = false;
+ if (!grpcStatus.Ok()) {
+ errorStatus = TPlainStatus(std::move(grpcStatus));
+ }
+ bool doRead = false;
+ with_lock (Lock) {
+ UpdateTimedCountersImpl();
+ if (connectionGeneration != ConnectionGeneration) {
+ return; // Message from previous connection. Ignore.
+ }
+ if (errorStatus.Ok()) {
+ if (NPersQueue::IsErrorMessage(*ServerMessage)) {
+ errorStatus = NPersQueue::MakeErrorFromProto(*ServerMessage);
+ } else {
+ processResult = ProcessServerMessageImpl();
+ needSetValue = !InitSeqNoSetDone && processResult.InitSeqNo.Defined() && (InitSeqNoSetDone = true);
+ if (errorStatus.Ok() && processResult.Ok) {
+ doRead = true;
+ }
+ }
+ }
+ }
+ if (doRead)
+ ReadFromProcessor();
+
+ with_lock(Lock) {
+ if (!errorStatus.Ok()) {
+ if (processResult.Ok) { // Otherwise, OnError was already called
+ processResult.HandleResult = RestartImpl(errorStatus);
+ }
+ }
+ if (processResult.HandleResult.DoStop) {
+ CloseImpl(std::move(errorStatus));
+ }
+ }
+ for (auto& event : processResult.Events) {
+ EventsQueue->PushEvent(std::move(event));
+ }
+ if (needSetValue) {
+ InitSeqNoPromise.SetValue(*processResult.InitSeqNo);
+ processResult.HandleResult.DoSetSeqNo = false; // Redundant. Just in case.
+ }
+ ProcessHandleResult(processResult.HandleResult);
+}
+
+TStringBuilder TWriteSession::LogPrefix() const {
+ return TStringBuilder() << "ProducerId [" << Settings.ProducerId_ << "] MessageGroupId [" << Settings.MessageGroupId_ << "] SessionId [" << SessionId << "] ";
+}
+
+TString TWriteSessionEvent::TAcksEvent::DebugString() const {
+ TStringBuilder res;
+ res << "AcksEvent:";
+ for (auto& ack : Acks) {
+ res << " { seqNo : " << ack.SeqNo << ", State : " << ack.State;
+ if (ack.Details) {
+ res << ", offset : " << ack.Details->Offset << ", partitionId : " << ack.Details->PartitionId;
+ }
+ res << " }";
+ }
+ if (!Acks.empty() && Acks.back().Stat) {
+ auto& stat = Acks.back().Stat;
+ res << " write stat: Write time " << stat->WriteTime
+ << " minimal time in partition queue " << stat->MinTimeInPartitionQueue
+ << " maximal time in partition queue " << stat->MaxTimeInPartitionQueue
+ << " partition quoted time " << stat->PartitionQuotedTime
+ << " topic quoted time " << stat->TopicQuotedTime;
+ }
+ return res;
+}
+
+TString TWriteSessionEvent::TReadyToAcceptEvent::DebugString() const {
+ return "ReadyToAcceptEvent";
+}
+
+
+TWriteSession::TProcessSrvMessageResult TWriteSession::ProcessServerMessageImpl() {
+ TProcessSrvMessageResult result;
+ switch (ServerMessage->GetServerMessageCase()) {
+ case TServerMessage::SERVER_MESSAGE_NOT_SET: {
+ SessionEstablished = false;
+ result.HandleResult = OnErrorImpl({
+ static_cast<NYdb::EStatus>(ServerMessage->status()),
+ {NYql::TIssue{ServerMessage->DebugString()}}
+ });
+ result.Ok = false;
+ break;
+ }
+ case TServerMessage::kInitResponse: {
+ const auto& initResponse = ServerMessage->init_response();
+ DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString());
+ SessionId = initResponse.session_id();
+ PartitionId = initResponse.partition_id();
+ ui64 newLastSeqNo = initResponse.last_seq_no();
+ // SeqNo increased, so there's a risk of loss, apply SeqNo shift.
+ // MinUnsentSeqNo must be > 0 if anything was ever sent yet
+ if (MinUnsentSeqNo && OnSeqNoShift && newLastSeqNo > MinUnsentSeqNo) {
+ SeqNoShift = newLastSeqNo - MinUnsentSeqNo;
+ }
+ result.InitSeqNo = newLastSeqNo;
+ LastSeqNo = newLastSeqNo;
+
+ SessionEstablished = true;
+ LastCountersUpdateTs = TInstant::Now();
+ SessionStartedTs = TInstant::Now();
+ OnErrorResolved();
+
+ //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ // Kickstart send after session reestablishment
+ SendImpl();
+ break;
+ }
+ case TServerMessage::kWriteResponse: {
+ TWriteSessionEvent::TAcksEvent acksEvent;
+ const auto& batchWriteResponse = ServerMessage->write_response();
+ DbDriverState->Log.Write(
+ TLOG_DEBUG,
+ LogPrefix() << "Write session got write response: " << batchWriteResponse.ShortDebugString()
+ );
+ TWriteStat::TPtr writeStat = new TWriteStat{};
+ const auto& stat = batchWriteResponse.write_statistics();
+
+ auto durationConv = [](const ::google::protobuf::Duration& dur) {
+ return TDuration::MilliSeconds(::google::protobuf::util::TimeUtil::DurationToMilliseconds(dur));
+ };
+
+ writeStat->WriteTime = durationConv(stat.persisting_time());
+ writeStat->MinTimeInPartitionQueue = durationConv(stat.min_queue_wait_time());
+ writeStat->MaxTimeInPartitionQueue = durationConv(stat.max_queue_wait_time());
+ writeStat->PartitionQuotedTime = durationConv(stat.partition_quota_wait_time());
+ writeStat->TopicQuotedTime = durationConv(stat.topic_quota_wait_time());
+
+ for (size_t messageIndex = 0, endIndex = batchWriteResponse.acks_size(); messageIndex != endIndex; ++messageIndex) {
+ // TODO: Fill writer statistics
+ auto ack = batchWriteResponse.acks(messageIndex);
+ ui64 sequenceNumber = ack.seq_no();
+
+ Y_VERIFY(ack.has_written() || ack.has_skipped());
+ auto msgWriteStatus = ack.has_written()
+ ? TWriteSessionEvent::TWriteAck::EES_WRITTEN
+ : (ack.skipped().reason() == Ydb::Topic::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason_REASON_ALREADY_WRITTEN
+ ? TWriteSessionEvent::TWriteAck::EES_ALREADY_WRITTEN
+ : TWriteSessionEvent::TWriteAck::EES_DISCARDED);
+
+ ui64 offset = ack.has_written() ? ack.written().offset() : 0;
+
+ acksEvent.Acks.push_back(TWriteSessionEvent::TWriteAck{
+ sequenceNumber - SeqNoShift,
+ msgWriteStatus,
+ TWriteSessionEvent::TWriteAck::TWrittenMessageDetails {
+ offset,
+ PartitionId,
+ },
+ writeStat,
+ });
+
+ if (CleanupOnAcknowledged(sequenceNumber - SeqNoShift)) {
+ result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ }
+ }
+ //EventsQueue->PushEvent(std::move(acksEvent));
+ result.Events.emplace_back(std::move(acksEvent));
+ break;
+ }
+ case TServerMessage::kUpdateTokenResponse: {
+ UpdateTokenInProgress = false;
+ DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully");
+ UpdateTokenIfNeededImpl();
+ break;
+ }
+ }
+ return result;
+}
+
+bool TWriteSession::CleanupOnAcknowledged(ui64 sequenceNumber) {
+ bool result = false;
+ DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber);
+ UpdateTimedCountersImpl();
+ const auto& sentFront = SentOriginalMessages.front();
+ ui64 size = 0;
+ ui64 compressedSize = 0;
+ if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == sequenceNumber) {
+ auto memoryUsage = OnMemoryUsageChangedImpl(-SentPackedMessage.front().Data.size());
+ result = memoryUsage.NowOk && !memoryUsage.WasOk;
+ //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ const auto& front = SentPackedMessage.front();
+ if (front.Compressed) {
+ compressedSize = front.Data.size();
+ } else {
+ size = front.Data.size();
+ }
+
+ (*Counters->MessagesWritten) += front.MessageCount;
+ (*Counters->MessagesInflight) -= front.MessageCount;
+ (*Counters->BytesWritten) += front.OriginalSize;
+
+ SentPackedMessage.pop();
+ } else {
+ size = sentFront.Size;
+ (*Counters->BytesWritten) += sentFront.Size;
+ (*Counters->MessagesWritten)++;
+ (*Counters->MessagesInflight)--;
+ }
+
+ (*Counters->BytesInflightCompressed) -= compressedSize;
+ (*Counters->BytesWrittenCompressed) += compressedSize;
+ (*Counters->BytesInflightUncompressed) -= size;
+
+ Y_VERIFY(Counters->BytesInflightCompressed->Val() >= 0);
+ Y_VERIFY(Counters->BytesInflightUncompressed->Val() >= 0);
+
+ Y_VERIFY(sentFront.SeqNo == sequenceNumber);
+
+ (*Counters->BytesInflightTotal) = MemoryUsage;
+ SentOriginalMessages.pop();
+ return result;
+}
+
+// Only called under Lock
+TMemoryUsageChange TWriteSession::OnMemoryUsageChangedImpl(i64 diff) {
+ bool wasOk = MemoryUsage <= Settings.MaxMemoryUsage_;
+ //if (diff < 0) {
+ // Y_VERIFY(MemoryUsage >= static_cast<size_t>(std::abs(diff)));
+ //}
+ MemoryUsage += diff;
+ bool nowOk = MemoryUsage <= Settings.MaxMemoryUsage_;
+ if (wasOk != nowOk) {
+ if (wasOk) {
+ DbDriverState->Log.Write(
+ TLOG_DEBUG,
+ LogPrefix() << "Estimated memory usage " << MemoryUsage
+ << "[B] reached maximum (" << Settings.MaxMemoryUsage_ << "[B])"
+ );
+ }
+ else {
+ DbDriverState->Log.Write(
+ TLOG_DEBUG,
+ LogPrefix() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]"
+ );
+ }
+ }
+ return {wasOk, nowOk};
+}
+
+TBuffer CompressBuffer(TVector<TStringBuf>& data, ECodec codec, i32 level) {
+ TBuffer result;
+ THolder<IOutputStream> coder = NCompressionDetails::CreateCoder(codec, result, level);
+ for (auto& buffer : data) {
+ coder->Write(buffer.data(), buffer.size());
+ }
+ coder->Finish();
+ return result;
+}
+
+// May call OnCompressed with sync executor. No external lock.
+void TWriteSession::CompressImpl(TBlock&& block_) {
+ auto weakThis = weak_from_this();
+ bool isSyncCompression = !CompressionExecutor->IsAsync();
+ Y_VERIFY(block_.Valid);
+
+ std::shared_ptr<TBlock> blockPtr(std::make_shared<TBlock>());
+ blockPtr->Move(block_);
+ auto lambda = [weakThis, codec = Settings.Codec_, level = Settings.CompressionLevel_,
+ isSyncCompression, blockPtr]() mutable
+ {
+ if (auto sharedThis = weakThis.lock()) {
+ Y_VERIFY(!blockPtr->Compressed);
+
+ auto compressedData = CompressBuffer(
+ blockPtr->OriginalDataRefs, codec, level
+ );
+ Y_VERIFY(!compressedData.Empty());
+ blockPtr->Data = std::move(compressedData);
+ blockPtr->Compressed = true;
+ blockPtr->CodecID = static_cast<ui32>(sharedThis->Settings.Codec_);
+ sharedThis->OnCompressed(std::move(*blockPtr), isSyncCompression);
+ }
+ };
+
+ CompressionExecutor->Post(lambda);
+}
+
+void TWriteSession::OnCompressed(TBlock&& block, bool isSyncCompression) {
+ TMemoryUsageChange memoryUsage;
+ if (!isSyncCompression) {
+ with_lock(Lock) {
+ memoryUsage = OnCompressedImpl(std::move(block));
+ }
+ } else {
+ memoryUsage = OnCompressedImpl(std::move(block));
+ }
+ if (memoryUsage.NowOk && !memoryUsage.WasOk) {
+ EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ }
+}
+
+//Called under lock or synchronously if compression is sync
+TMemoryUsageChange TWriteSession::OnCompressedImpl(TBlock&& block) {
+ UpdateTimedCountersImpl();
+ Y_VERIFY(block.Valid);
+ auto memoryUsage = OnMemoryUsageChangedImpl(static_cast<i64>(block.Data.size()) - block.OriginalMemoryUsage);
+ (*Counters->BytesInflightUncompressed) -= block.OriginalSize;
+ (*Counters->BytesInflightCompressed) += block.Data.size();
+
+ PackedMessagesToSend.emplace(std::move(block));
+ SendImpl();
+ return memoryUsage;
+}
+
+// Only called under lock
+void TWriteSession::ResetForRetryImpl() {
+ SessionEstablished = false;
+ const size_t totalPackedMessages = PackedMessagesToSend.size() + SentPackedMessage.size();
+ const size_t totalOriginalMessages = OriginalMessagesToSend.size() + SentOriginalMessages.size();
+ while (!SentPackedMessage.empty()) {
+ PackedMessagesToSend.emplace(std::move(SentPackedMessage.front()));
+ SentPackedMessage.pop();
+ }
+ ui64 minSeqNo = PackedMessagesToSend.empty() ? LastSeqNo + 1 : PackedMessagesToSend.top().Offset;
+ std::queue<TOriginalMessage> freshOriginalMessagesToSend;
+ OriginalMessagesToSend.swap(freshOriginalMessagesToSend);
+ while (!SentOriginalMessages.empty()) {
+ OriginalMessagesToSend.emplace(std::move(SentOriginalMessages.front()));
+ SentOriginalMessages.pop();
+ }
+ while (!freshOriginalMessagesToSend.empty()) {
+ OriginalMessagesToSend.emplace(std::move(freshOriginalMessagesToSend.front()));
+ freshOriginalMessagesToSend.pop();
+ }
+ if (!OriginalMessagesToSend.empty() && OriginalMessagesToSend.front().SeqNo < minSeqNo)
+ minSeqNo = OriginalMessagesToSend.front().SeqNo;
+ MinUnsentSeqNo = minSeqNo;
+ Y_VERIFY(PackedMessagesToSend.size() == totalPackedMessages);
+ Y_VERIFY(OriginalMessagesToSend.size() == totalOriginalMessages);
+}
+
+// Called from client Write() methods. With lock
+void TWriteSession::FlushWriteIfRequiredImpl() {
+
+ if (!CurrentBatch.Empty() && !CurrentBatch.FlushRequested) {
+ MessagesAcquired += static_cast<ui64>(CurrentBatch.Acquire());
+ if (TInstant::Now() - CurrentBatch.StartedAt >= Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero())
+ || CurrentBatch.CurrentSize >= Settings.BatchFlushSizeBytes_.GetOrElse(0)
+ || CurrentBatch.CurrentSize >= MaxBlockSize
+ || CurrentBatch.Messages.size() >= MaxBlockMessageCount
+ || CurrentBatch.HasCodec()
+ ) {
+ WriteBatchImpl();
+ return;
+ }
+ }
+}
+
+
+// Involves compression, but still called under lock.
+size_t TWriteSession::WriteBatchImpl() {
+ DbDriverState->Log.Write(
+ TLOG_DEBUG,
+ LogPrefix() << "write " << CurrentBatch.Messages.size() << " messages with seqNo from "
+ << CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo
+ );
+
+ Y_VERIFY(CurrentBatch.Messages.size() <= MaxBlockMessageCount);
+
+ const bool skipCompression = Settings.Codec_ == ECodec::RAW || CurrentBatch.HasCodec();
+ if (!skipCompression && Settings.CompressionExecutor_->IsAsync()) {
+ MessagesAcquired += static_cast<ui64>(CurrentBatch.Acquire());
+ }
+
+ size_t size = 0;
+ for (size_t i = 0; i != CurrentBatch.Messages.size();) {
+ TBlock block{};
+ for (; block.OriginalSize < MaxBlockSize && i != CurrentBatch.Messages.size(); ++i) {
+ auto sequenceNumber = CurrentBatch.Messages[i].SeqNo;
+ auto createTs = CurrentBatch.Messages[i].CreatedAt;
+
+ if (!block.MessageCount) {
+ block.Offset = sequenceNumber;
+ }
+
+ block.MessageCount += 1;
+ const auto& datum = CurrentBatch.Messages[i].DataRef;
+ block.OriginalSize += datum.size();
+ block.OriginalMemoryUsage = CurrentBatch.Data.size();
+ block.OriginalDataRefs.emplace_back(datum);
+ if (CurrentBatch.Messages[i].Codec.Defined()) {
+ Y_VERIFY(CurrentBatch.Messages.size() == 1);
+ block.CodecID = static_cast<ui32>(*CurrentBatch.Messages[i].Codec);
+ block.OriginalSize = CurrentBatch.Messages[i].OriginalSize;
+ block.Compressed = false;
+ }
+ size += datum.size();
+ UpdateTimedCountersImpl();
+ (*Counters->BytesInflightUncompressed) += datum.size();
+ (*Counters->MessagesInflight)++;
+ OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size());
+ }
+ block.Data = std::move(CurrentBatch.Data);
+ if (skipCompression) {
+ PackedMessagesToSend.emplace(std::move(block));
+ } else {
+ CompressImpl(std::move(block));
+ }
+ }
+ CurrentBatch.Reset();
+ if (skipCompression) {
+ SendImpl();
+ }
+ return size;
+}
+
+size_t GetMaxGrpcMessageSize() {
+ return 120_MB;
+}
+
+bool TWriteSession::IsReadyToSendNextImpl() const {
+ if (!SessionEstablished) {
+ return false;
+ }
+ if (Aborting)
+ return false;
+ if (PackedMessagesToSend.empty()) {
+ return false;
+ }
+ Y_VERIFY(!OriginalMessagesToSend.empty(), "There are packed messages but no original messages");
+ Y_VERIFY(OriginalMessagesToSend.front().SeqNo <= PackedMessagesToSend.top().Offset, "Lost original message(s)");
+
+ return PackedMessagesToSend.top().Offset == OriginalMessagesToSend.front().SeqNo;
+}
+
+
+void TWriteSession::UpdateTokenIfNeededImpl() {
+ DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: try to update token");
+
+ if (!DbDriverState->CredentialsProvider || UpdateTokenInProgress || !SessionEstablished)
+ return;
+ TClientMessage clientMessage;
+ auto* updateRequest = clientMessage.mutable_update_token_request();
+ auto token = DbDriverState->CredentialsProvider->GetAuthInfo();
+ if (token == PrevToken)
+ return;
+ UpdateTokenInProgress = true;
+ updateRequest->set_token(token);
+ PrevToken = token;
+
+ DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: updating token");
+
+ Processor->Write(std::move(clientMessage));
+}
+
+void TWriteSession::SendImpl() {
+ // External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB
+ while(IsReadyToSendNextImpl()) {
+ TClientMessage clientMessage;
+ auto* writeRequest = clientMessage.mutable_write_request();
+
+ // Sent blocks while we can without messages reordering
+ while (IsReadyToSendNextImpl() && clientMessage.ByteSizeLong() < GetMaxGrpcMessageSize()) {
+ const auto& block = PackedMessagesToSend.top();
+ Y_VERIFY(block.Valid);
+ writeRequest->set_codec(static_cast<i32>(block.CodecID));
+ Y_VERIFY(block.MessageCount == 1);
+ for (size_t i = 0; i != block.MessageCount; ++i) {
+ Y_VERIFY(!OriginalMessagesToSend.empty());
+
+ auto& message = OriginalMessagesToSend.front();
+
+ auto* msgData = writeRequest->add_messages();
+
+
+ msgData->set_seq_no(message.SeqNo + SeqNoShift);
+ *msgData->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(message.CreatedAt.MilliSeconds());
+
+ SentOriginalMessages.emplace(std::move(message));
+ OriginalMessagesToSend.pop();
+
+ msgData->set_uncompressed_size(block.OriginalSize);
+ if (block.Compressed)
+ msgData->set_data(block.Data.data(), block.Data.size());
+ else {
+ for (auto& buffer: block.OriginalDataRefs) {
+ msgData->set_data(buffer.data(), buffer.size());
+ }
+ }
+ }
+
+
+ TBlock moveBlock;
+ moveBlock.Move(block);
+ SentPackedMessage.emplace(std::move(moveBlock));
+ PackedMessagesToSend.pop();
+ }
+ UpdateTokenIfNeededImpl();
+ DbDriverState->Log.Write(
+ TLOG_DEBUG,
+ LogPrefix() << "Send " << writeRequest->messages_size() << " message(s) ("
+ << OriginalMessagesToSend.size() << " left), first sequence number is "
+ << writeRequest->messages(0).seq_no()
+ );
+ Processor->Write(std::move(clientMessage));
+ }
+}
+
+// Client method, no Lock
+bool TWriteSession::Close(TDuration closeTimeout) {
+ if (AtomicGet(Aborting))
+ return false;
+ DbDriverState->Log.Write(
+ TLOG_INFO,
+ LogPrefix() << "Write session: close. Timeout = " << closeTimeout.MilliSeconds() << " ms"
+ );
+ auto startTime = TInstant::Now();
+ auto remaining = closeTimeout;
+ bool ready = false;
+ bool needSetSeqNoValue = false;
+ while (remaining > TDuration::Zero()) {
+ with_lock(Lock) {
+ if (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) {
+ ready = true;
+ }
+ if (AtomicGet(Aborting))
+ break;
+ }
+ if (ready) {
+ break;
+ }
+ remaining = closeTimeout - (TInstant::Now() - startTime);
+ Sleep(Min(TDuration::MilliSeconds(100), remaining));
+ }
+ with_lock(Lock) {
+ ready = (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) && !AtomicGet(Aborting);
+ }
+ with_lock(Lock) {
+ CloseImpl(EStatus::SUCCESS, NYql::TIssues{});
+ needSetSeqNoValue = !InitSeqNoSetDone && (InitSeqNoSetDone = true);
+ }
+ if (needSetSeqNoValue) {
+ InitSeqNoPromise.SetException("session closed");
+ }
+ if (ready) {
+ DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete");
+ } else {
+ DbDriverState->Log.Write(
+ TLOG_WARNING,
+ LogPrefix() << "Write session: could not confirm all writes in time"
+ << " or session aborted, perform hard shutdown"
+ );
+ }
+ return ready;
+}
+
+void TWriteSession::HandleWakeUpImpl() {
+ FlushWriteIfRequiredImpl();
+ if (AtomicGet(Aborting)) {
+ return;
+ }
+ auto callback = [weakThis = this->weak_from_this()] (bool ok)
+ {
+ if (!ok)
+ return;
+ if (auto sharedThis = weakThis.lock()) {
+ with_lock(sharedThis->Lock) {
+ sharedThis->HandleWakeUpImpl();
+ }
+ }
+ };
+ auto enqueueTokenCallback = [weakThis = this->weak_from_this()] (bool ok) {
+ if (!ok)
+ return;
+ if (auto sharedThis = weakThis.lock()) {
+ sharedThis->EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ }
+ };
+ if (TInstant::Now() - LastTokenUpdate > UPDATE_TOKEN_PERIOD) {
+ LastTokenUpdate = TInstant::Now();
+ UpdateTokenIfNeededImpl();
+ }
+
+ const auto flushAfter = CurrentBatch.StartedAt == TInstant::Zero()
+ ? WakeupInterval
+ : WakeupInterval - Min(Now() - CurrentBatch.StartedAt, WakeupInterval);
+ Connections->ScheduleCallback(flushAfter, std::move(callback));
+}
+
+void TWriteSession::UpdateTimedCountersImpl() {
+ auto now = TInstant::Now();
+ auto delta = (now - LastCountersUpdateTs).MilliSeconds();
+ double percent = 100.0 / Settings.MaxMemoryUsage_;
+
+ Counters->TotalBytesInflightUsageByTime->Collect(*Counters->BytesInflightTotal * percent, delta);
+ Counters->UncompressedBytesInflightUsageByTime->Collect(*Counters->BytesInflightUncompressed * percent, delta);
+ Counters->CompressedBytesInflightUsageByTime->Collect(*Counters->BytesInflightCompressed * percent, delta);
+
+ *Counters->CurrentSessionLifetimeMs = (TInstant::Now() - SessionStartedTs).MilliSeconds();
+ LastCountersUpdateTs = now;
+ if (LastCountersLogTs == TInstant::Zero() || TInstant::Now() - LastCountersLogTs > TDuration::Seconds(60)) {
+ LastCountersLogTs = TInstant::Now();
+
+#define LOG_COUNTER(counter) \
+ << " " Y_STRINGIZE(counter) ": " \
+ << Counters->counter->Val() \
+ /**/
+
+ DbDriverState->Log.Write(TLOG_INFO, LogPrefix()
+ << "Counters: {"
+ LOG_COUNTER(Errors)
+ LOG_COUNTER(CurrentSessionLifetimeMs)
+ LOG_COUNTER(BytesWritten)
+ LOG_COUNTER(MessagesWritten)
+ LOG_COUNTER(BytesWrittenCompressed)
+ LOG_COUNTER(BytesInflightUncompressed)
+ LOG_COUNTER(BytesInflightCompressed)
+ LOG_COUNTER(BytesInflightTotal)
+ LOG_COUNTER(MessagesInflight)
+ << " }"
+ );
+
+#undef LOG_COUNTER
+ }
+}
+
+void TWriteSession::AbortImpl() {
+ if (!AtomicGet(Aborting)) {
+ DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: aborting");
+ AtomicSet(Aborting, 1);
+ NPersQueue::Cancel(ConnectContext);
+ NPersQueue::Cancel(ConnectTimeoutContext);
+ NPersQueue::Cancel(ConnectDelayContext);
+ if (Processor)
+ Processor->Cancel();
+
+ NPersQueue::Cancel(ClientContext);
+ ClientContext.reset(); // removes context from contexts set from underlying gRPC-client.
+ }
+}
+
+void TWriteSession::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) {
+ DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close");
+ EventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues)));
+ AbortImpl();
+}
+
+void TWriteSession::CloseImpl(EStatus statusCode, const TString& message) {
+ NYql::TIssues issues;
+ issues.AddIssue(message);
+ CloseImpl(statusCode, std::move(issues));
+}
+
+void TWriteSession::CloseImpl(TPlainStatus&& status) {
+ DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close");
+ EventsQueue->Close(TSessionClosedEvent(std::move(status)));
+ AbortImpl();
+}
+
+TWriteSession::~TWriteSession() {
+ DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: destroy");
+ bool needClose = false;
+ with_lock(Lock) {
+ if (!AtomicGet(Aborting)) {
+ CloseImpl(EStatus::SUCCESS, NYql::TIssues{});
+
+ needClose = !InitSeqNoSetDone && (InitSeqNoSetDone = true);
+ }
+ }
+ if (needClose)
+ InitSeqNoPromise.SetException("session closed");
+}
+
+TSimpleBlockingWriteSession::TSimpleBlockingWriteSession(
+ const TWriteSessionSettings& settings,
+ std::shared_ptr<TTopicClient::TImpl> client,
+ std::shared_ptr<TGRpcConnectionsImpl> connections,
+ TDbDriverStatePtr dbDriverState
+) {
+ auto alteredSettings = settings;
+ alteredSettings.EventHandlers_.AcksHandler_ = [this](TWriteSessionEvent::TAcksEvent& event) {this->HandleAck(event); };
+ alteredSettings.EventHandlers_.ReadyToAcceptHander_ = [this](TWriteSessionEvent::TReadyToAcceptEvent& event)
+ {this->HandleReady(event); };
+ alteredSettings.EventHandlers_.SessionClosedHandler_ = [this](const TSessionClosedEvent& event) {this->HandleClosed(event); };
+
+ Writer = std::make_shared<TWriteSession>(
+ alteredSettings, client, connections, dbDriverState
+ );
+ Writer->Start(TDuration::Max());
+}
+
+ui64 TSimpleBlockingWriteSession::GetInitSeqNo() {
+ return Writer->GetInitSeqNo().GetValueSync();
+}
+
+bool TSimpleBlockingWriteSession::Write(
+ TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp, const TDuration& blockTimeout
+) {
+ if (!IsAlive())
+ return false;
+
+ auto continuationToken = WaitForToken(blockTimeout);
+ if (continuationToken.Defined()) {
+ Writer->Write(std::move(*continuationToken), std::move(data), seqNo, createTimestamp);
+ return true;
+ }
+ return false;
+}
+
+TMaybe<TContinuationToken> TSimpleBlockingWriteSession::WaitForToken(const TDuration& timeout) {
+ auto startTime = TInstant::Now();
+ TDuration remainingTime = timeout;
+ TMaybe<TContinuationToken> token = Nothing();
+ while(!token.Defined() && remainingTime > TDuration::Zero()) {
+ with_lock(Lock) {
+ if (!ContinueTokens.empty()) {
+ token = std::move(ContinueTokens.front());
+ ContinueTokens.pop();
+ }
+ }
+ if (!IsAlive())
+ return Nothing();
+
+ if (token.Defined()) {
+ return std::move(*token);
+ }
+ else {
+ remainingTime = timeout - (TInstant::Now() - startTime);
+ Sleep(Min(remainingTime, TDuration::MilliSeconds(100)));
+ }
+ }
+ return Nothing();
+}
+
+TWriterCounters::TPtr TSimpleBlockingWriteSession::GetCounters() {
+ return Writer->GetCounters();
+}
+
+
+bool TSimpleBlockingWriteSession::IsAlive() const {
+ bool closed = false;
+ with_lock(Lock) {
+ closed = Closed;
+ }
+ return !closed;
+}
+
+void TSimpleBlockingWriteSession::HandleAck(TWriteSessionEvent::TAcksEvent& event) {
+ Y_UNUSED(event);
+}
+
+void TSimpleBlockingWriteSession::HandleReady(TWriteSessionEvent::TReadyToAcceptEvent& event) {
+ with_lock(Lock) {
+ ContinueTokens.emplace(std::move(event.ContinuationToken));
+ }
+}
+void TSimpleBlockingWriteSession::HandleClosed(const TSessionClosedEvent&) {
+ with_lock(Lock) {
+ Closed = true;
+ }
+}
+bool TSimpleBlockingWriteSession::Close(TDuration closeTimeout) {
+ return Writer->Close(std::move(closeTimeout));
+}
+
+}; // namespace NYdb::NTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
new file mode 100644
index 00000000000..bfe94e21458
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
@@ -0,0 +1,464 @@
+#pragma once
+
+#include "topic_impl.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+
+#include <util/generic/buffer.h>
+
+
+namespace NYdb::NTopic {
+
+inline const TString& GetCodecId(const ECodec codec) {
+ static THashMap<ECodec, TString> idByCodec{
+ {ECodec::RAW, TString(1, '\0')},
+ {ECodec::GZIP, "\1"},
+ {ECodec::LZOP, "\2"},
+ {ECodec::ZSTD, "\3"}
+ };
+ Y_VERIFY(idByCodec.contains(codec));
+ return idByCodec[codec];
+}
+
+class TWriteSessionEventsQueue: public NPersQueue::TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor> {
+ using TParent = TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor>;
+
+public:
+ TWriteSessionEventsQueue(const TWriteSessionSettings& settings)
+ : TParent(settings)
+ {}
+
+ void PushEvent(TEventInfo eventInfo) {
+ if (Closed || ApplyHandler(eventInfo)) {
+ return;
+ }
+
+ NPersQueue::TWaiter waiter;
+ with_lock (Mutex) {
+ Events.emplace(std::move(eventInfo));
+ waiter = PopWaiterImpl();
+ }
+ waiter.Signal(); // Does nothing if waiter is empty.
+ }
+
+ TMaybe<TEvent> GetEvent(bool block = false) {
+ TMaybe<TEventInfo> eventInfo;
+ with_lock (Mutex) {
+ if (block) {
+ WaitEventsImpl();
+ }
+ if (HasEventsImpl()) {
+ eventInfo = GetEventImpl();
+ } else {
+ return Nothing();
+ }
+ }
+ eventInfo->OnUserRetrievedEvent();
+ return std::move(eventInfo->Event);
+ }
+
+ TVector<TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing()) {
+ TVector<TEventInfo> eventInfos;
+ with_lock (Mutex) {
+ if (block) {
+ WaitEventsImpl();
+ }
+ eventInfos.reserve(Min(Events.size() + CloseEvent.Defined(), maxEventsCount ? *maxEventsCount : std::numeric_limits<size_t>::max()));
+ while (!Events.empty()) {
+ eventInfos.emplace_back(GetEventImpl());
+ if (maxEventsCount && eventInfos.size() >= *maxEventsCount) {
+ break;
+ }
+ }
+ if (CloseEvent && Events.empty() && (!maxEventsCount || eventInfos.size() < *maxEventsCount)) {
+ eventInfos.push_back({*CloseEvent});
+ }
+ }
+
+ TVector<TEvent> result;
+ result.reserve(eventInfos.size());
+ for (TEventInfo& eventInfo : eventInfos) {
+ eventInfo.OnUserRetrievedEvent();
+ result.emplace_back(std::move(eventInfo.Event));
+ }
+ return result;
+ }
+
+ void Close(const TSessionClosedEvent& event) {
+ NPersQueue::TWaiter waiter;
+ with_lock (Mutex) {
+ CloseEvent = event;
+ Closed = true;
+ waiter = NPersQueue::TWaiter(Waiter.ExtractPromise(), this);
+ }
+
+ TEventInfo info(event);
+ ApplyHandler(info);
+
+ waiter.Signal();
+ }
+
+private:
+ struct THandlersVisitor : public TParent::TBaseHandlersVisitor {
+ using TParent::TBaseHandlersVisitor::TBaseHandlersVisitor;
+#define DECLARE_HANDLER(type, handler, answer) \
+ bool operator()(type& event) { \
+ if (Settings.EventHandlers_.handler) { \
+ Settings.EventHandlers_.handler(event); \
+ return answer; \
+ } \
+ return false; \
+ } \
+ /**/
+ DECLARE_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler_, true);
+ DECLARE_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHander_, true);
+ DECLARE_HANDLER(TSessionClosedEvent, SessionClosedHandler_, false); // Not applied
+
+#undef DECLARE_HANDLER
+ bool Visit() {
+ return std::visit(*this, EventInfo.Event);
+ }
+
+ };
+
+ bool ApplyHandler(TEventInfo& eventInfo) {
+ THandlersVisitor visitor(Settings, eventInfo);
+ return visitor.Visit();
+ }
+
+ TEventInfo GetEventImpl() { // Assumes that we're under lock and that the event queue has events.
+ Y_ASSERT(HasEventsImpl());
+ if (!Events.empty()) {
+ TEventInfo event = std::move(Events.front());
+ Events.pop();
+ RenewWaiterImpl();
+ return event;
+ }
+ Y_ASSERT(CloseEvent);
+ return {*CloseEvent};
+ }
+};
+
+struct TMemoryUsageChange {
+ bool WasOk; //!< MemoryUsage <= Config.MaxMemoryUsage_ before update
+ bool NowOk; //!< Same, only after update
+};
+
+class TWriteSession : public IWriteSession,
+ public std::enable_shared_from_this<TWriteSession> {
+private:
+ friend class TSimpleBlockingWriteSession;
+ friend class TTopicClient;
+
+ using TClientMessage = Ydb::Topic::StreamWriteMessage::FromClient;
+ using TServerMessage = Ydb::Topic::StreamWriteMessage::FromServer;
+ using IWriteSessionConnectionProcessorFactory =
+ TTopicClient::TImpl::IWriteSessionConnectionProcessorFactory;
+ using IProcessor = IWriteSessionConnectionProcessorFactory::IProcessor;
+
+ struct TMessage {
+ ui64 SeqNo;
+ TInstant CreatedAt;
+ TStringBuf DataRef;
+ TMaybe<ECodec> Codec;
+ ui32 OriginalSize; // only for coded messages
+ TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {}, ui32 originalSize = 0)
+ : SeqNo(seqNo)
+ , CreatedAt(createdAt)
+ , DataRef(data)
+ , Codec(codec)
+ , OriginalSize(originalSize)
+ {}
+ };
+
+ struct TMessageBatch {
+ TBuffer Data;
+ TVector<TMessage> Messages;
+ ui64 CurrentSize = 0;
+ TInstant StartedAt = TInstant::Zero();
+ bool Acquired = false;
+ bool FlushRequested = false;
+ void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize) {
+ if (StartedAt == TInstant::Zero())
+ StartedAt = TInstant::Now();
+ CurrentSize += codec ? originalSize : data.size();
+ Messages.emplace_back(seqNo, createdAt, data, codec, originalSize);
+ Acquired = false;
+ }
+
+ bool HasCodec() const {
+ return Messages.empty() ? false : Messages.front().Codec.Defined();
+ }
+
+ bool Acquire() {
+ if (Acquired || Messages.empty())
+ return false;
+ auto currSize = Data.size();
+ Data.Append(Messages.back().DataRef.data(), Messages.back().DataRef.size());
+ Messages.back().DataRef = TStringBuf(Data.data() + currSize, Data.size() - currSize);
+ Acquired = true;
+ return true;
+ }
+
+ bool Empty() const noexcept {
+ return CurrentSize == 0 && Messages.empty();
+ }
+
+ void Reset() {
+ StartedAt = TInstant::Zero();
+ Messages.clear();
+ Data.Clear();
+ Acquired = false;
+ CurrentSize = 0;
+ FlushRequested = false;
+ }
+ };
+
+ struct TBlock {
+ size_t Offset = 0; //!< First message sequence number in the block
+ size_t MessageCount = 0;
+ size_t PartNumber = 0;
+ size_t OriginalSize = 0;
+ size_t OriginalMemoryUsage = 0;
+ ui32 CodecID = static_cast<ui32>(ECodec::RAW);
+ mutable TVector<TStringBuf> OriginalDataRefs;
+ mutable TBuffer Data;
+ bool Compressed = false;
+ mutable bool Valid = true;
+
+ TBlock& operator=(TBlock&&) = default;
+ TBlock(TBlock&&) = default;
+ TBlock() = default;
+
+ //For taking ownership by copying from const object, f.e. lambda -> std::function, priority_queue
+ void Move(const TBlock& rhs) {
+ Offset = rhs.Offset;
+ MessageCount = rhs.MessageCount;
+ PartNumber = rhs.PartNumber;
+ OriginalSize = rhs.OriginalSize;
+ OriginalMemoryUsage = rhs.OriginalMemoryUsage;
+ CodecID = rhs.CodecID;
+ OriginalDataRefs.swap(rhs.OriginalDataRefs);
+ Data.Swap(rhs.Data);
+ Compressed = rhs.Compressed;
+
+ rhs.Data.Clear();
+ rhs.OriginalDataRefs.clear();
+ }
+ };
+
+ struct TOriginalMessage {
+ ui64 SeqNo;
+ TInstant CreatedAt;
+ size_t Size;
+ TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size)
+ : SeqNo(sequenceNumber)
+ , CreatedAt(createdAt)
+ , Size(size)
+ {}
+ };
+
+ //! Block comparer, makes block with smallest offset (first sequence number) appear on top of the PackedMessagesToSend priority queue
+ struct Greater {
+ bool operator() (const TBlock& lhs, const TBlock& rhs) {
+ return lhs.Offset > rhs.Offset;
+ }
+ };
+
+ struct THandleResult {
+ bool DoRestart = false;
+ TDuration StartDelay = TDuration::Zero();
+ bool DoStop = false;
+ bool DoSetSeqNo = false;
+ };
+ struct TProcessSrvMessageResult {
+ THandleResult HandleResult;
+ TMaybe<ui64> InitSeqNo;
+ TVector<TWriteSessionEvent::TEvent> Events;
+ bool Ok = true;
+ };
+
+ THandleResult OnErrorImpl(NYdb::TPlainStatus&& status); // true - should Start(), false - should Close(), empty - no action
+
+public:
+ TWriteSession(const TWriteSessionSettings& settings,
+ std::shared_ptr<TTopicClient::TImpl> client,
+ std::shared_ptr<TGRpcConnectionsImpl> connections,
+ TDbDriverStatePtr dbDriverState);
+
+ TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false) override;
+ TVector<TWriteSessionEvent::TEvent> GetEvents(bool block = false,
+ TMaybe<size_t> maxEventsCount = Nothing()) override;
+ NThreading::TFuture<ui64> GetInitSeqNo() override;
+
+ void Write(TContinuationToken&& continuationToken, TStringBuf data,
+ TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override;
+
+ void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize,
+ TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override;
+
+
+ NThreading::TFuture<void> WaitEvent() override;
+
+ // Empty maybe - block till all work is done. Otherwise block at most at closeTimeout duration.
+ bool Close(TDuration closeTimeout = TDuration::Max()) override;
+
+ TWriterCounters::TPtr GetCounters() override {Y_FAIL("Unimplemented"); } //ToDo - unimplemented;
+
+ ~TWriteSession(); // will not call close - destroy everything without acks
+
+private:
+
+ TStringBuilder LogPrefix() const;
+
+ void UpdateTokenIfNeededImpl();
+
+ void WriteInternal(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize,
+ TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing());
+
+ void FlushWriteIfRequiredImpl();
+ size_t WriteBatchImpl();
+ void Start(const TDuration& delay);
+ void InitWriter();
+
+ void OnConnect(TPlainStatus&& st, typename IProcessor::TPtr&& processor,
+ const NGrpc::IQueueClientContextPtr& connectContext);
+ void OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext);
+ void ResetForRetryImpl();
+ THandleResult RestartImpl(const TPlainStatus& status);
+ void DoConnect(const TDuration& delay, const TString& endpoint);
+ void InitImpl();
+ void ReadFromProcessor(); // Assumes that we're under lock.
+ void WriteToProcessorImpl(TClientMessage&& req); // Assumes that we're under lock.
+ void OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration);
+ void OnWriteDone(NGrpc::TGrpcStatus&& status, size_t connectionGeneration);
+ TProcessSrvMessageResult ProcessServerMessageImpl();
+ TMemoryUsageChange OnMemoryUsageChangedImpl(i64 diff);
+ void CompressImpl(TBlock&& block);
+ void OnCompressed(TBlock&& block, bool isSyncCompression=false);
+ TMemoryUsageChange OnCompressedImpl(TBlock&& block);
+
+ //TString GetDebugIdentity() const;
+ TClientMessage GetInitClientMessage();
+ bool CleanupOnAcknowledged(ui64 sequenceNumber);
+ bool IsReadyToSendNextImpl() const;
+ ui64 GetNextSeqNoImpl(const TMaybe<ui64>& seqNo);
+ void SendImpl();
+ void AbortImpl();
+ void CloseImpl(EStatus statusCode, NYql::TIssues&& issues);
+ void CloseImpl(EStatus statusCode, const TString& message);
+ void CloseImpl(TPlainStatus&& status);
+
+ void OnErrorResolved() {
+ RetryState = nullptr;
+ }
+ void CheckHandleResultImpl(THandleResult& result);
+ void ProcessHandleResult(THandleResult& result);
+ void HandleWakeUpImpl();
+ void UpdateTimedCountersImpl();
+
+private:
+ TWriteSessionSettings Settings;
+ std::shared_ptr<TTopicClient::TImpl> Client;
+ std::shared_ptr<TGRpcConnectionsImpl> Connections;
+ TString TargetCluster;
+ TString InitialCluster;
+ TString CurrentCluster;
+ bool OnSeqNoShift = false;
+ TString PreferredClusterByCDS;
+ std::shared_ptr<IWriteSessionConnectionProcessorFactory> ConnectionFactory;
+ TDbDriverStatePtr DbDriverState;
+ TStringType PrevToken;
+ bool UpdateTokenInProgress = false;
+ TInstant LastTokenUpdate = TInstant::Zero();
+ std::shared_ptr<TWriteSessionEventsQueue> EventsQueue;
+ NGrpc::IQueueClientContextPtr ClientContext; // Common client context.
+ NGrpc::IQueueClientContextPtr ConnectContext;
+ NGrpc::IQueueClientContextPtr ConnectTimeoutContext;
+ NGrpc::IQueueClientContextPtr ConnectDelayContext;
+ size_t ConnectionGeneration = 0;
+ size_t ConnectionAttemptsDone = 0;
+ TAdaptiveLock Lock;
+ IProcessor::TPtr Processor;
+ IRetryPolicy::IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting).
+ std::shared_ptr<TServerMessage> ServerMessage; // Server message to write server response to.
+
+ TString SessionId;
+ IExecutor::TPtr Executor;
+ IExecutor::TPtr CompressionExecutor;
+ size_t MemoryUsage = 0; //!< Estimated amount of memory used
+
+ TMessageBatch CurrentBatch;
+
+ std::queue<TOriginalMessage> OriginalMessagesToSend;
+ std::priority_queue<TBlock, std::vector<TBlock>, Greater> PackedMessagesToSend;
+ //! Messages that are sent but yet not acknowledged
+ std::queue<TOriginalMessage> SentOriginalMessages;
+ std::queue<TBlock> SentPackedMessage;
+
+ const size_t MaxBlockSize = std::numeric_limits<size_t>::max();
+ const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility
+ bool Connected = false;
+ bool Started = false;
+ TAtomic Aborting = 0;
+ bool SessionEstablished = false;
+ ui32 PartitionId = 0;
+ ui64 LastSeqNo = 0;
+ ui64 MinUnsentSeqNo = 0;
+ ui64 SeqNoShift = 0;
+ TMaybe<bool> AutoSeqNoMode;
+ bool ValidateSeqNoMode = false;
+
+ NThreading::TPromise<ui64> InitSeqNoPromise;
+ bool InitSeqNoSetDone = false;
+ TInstant SessionStartedTs;
+ TInstant LastCountersUpdateTs = TInstant::Zero();
+ TInstant LastCountersLogTs;
+ TWriterCounters::TPtr Counters;
+ TDuration WakeupInterval;
+
+protected:
+ ui64 MessagesAcquired = 0;
+};
+
+class TSimpleBlockingWriteSession : public ISimpleBlockingWriteSession {
+private:
+ using TClientMessage = TWriteSession::TClientMessage;
+ using TServerMessage = TWriteSession::TServerMessage;
+ using IWriteSessionConnectionProcessorFactory = TWriteSession::IWriteSessionConnectionProcessorFactory;
+ using IProcessor = TWriteSession::IProcessor;
+
+public:
+ TSimpleBlockingWriteSession(
+ const TWriteSessionSettings& settings,
+ std::shared_ptr<TTopicClient::TImpl> client,
+ std::shared_ptr<TGRpcConnectionsImpl> connections,
+ TDbDriverStatePtr dbDriverState);
+
+ bool Write(TStringBuf data, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing(),
+ const TDuration& blockTimeout = TDuration::Max()) override;
+
+ ui64 GetInitSeqNo() override;
+
+ bool Close(TDuration closeTimeout = TDuration::Max()) override;
+ bool IsAlive() const override;
+
+ TWriterCounters::TPtr GetCounters() override;
+
+protected:
+ std::shared_ptr<TWriteSession> Writer;
+
+private:
+ TMaybe<TContinuationToken> WaitForToken(const TDuration& timeout);
+ void HandleAck(TWriteSessionEvent::TAcksEvent&);
+ void HandleReady(TWriteSessionEvent::TReadyToAcceptEvent&);
+ void HandleClosed(const TSessionClosedEvent&);
+
+ TAdaptiveLock Lock;
+ std::queue<TContinuationToken> ContinueTokens;
+ bool Closed = false;
+};
+
+
+}; // namespace NYdb::NTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
index db79186e70b..c6784a7f9cb 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
@@ -437,6 +437,53 @@ struct TSessionClosedEvent: public TStatus {
TString DebugString() const;
};
+struct TWriteStat : public TThrRefBase {
+ TDuration WriteTime;
+ TDuration MinTimeInPartitionQueue;
+ TDuration MaxTimeInPartitionQueue;
+ TDuration PartitionQuotedTime;
+ TDuration TopicQuotedTime;
+ using TPtr = TIntrusivePtr<TWriteStat>;
+};
+
+class TContinuationToken : public TMoveOnly {
+ friend class TWriteSession;
+private:
+ TContinuationToken() = default;
+};
+
+struct TWriterCounters : public TThrRefBase {
+ using TSelf = TWriterCounters;
+ using TPtr = TIntrusivePtr<TSelf>;
+
+ explicit TWriterCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters);
+
+ ::NMonitoring::TDynamicCounters::TCounterPtr Errors;
+ ::NMonitoring::TDynamicCounters::TCounterPtr CurrentSessionLifetimeMs;
+
+ ::NMonitoring::TDynamicCounters::TCounterPtr BytesWritten;
+ ::NMonitoring::TDynamicCounters::TCounterPtr MessagesWritten;
+ ::NMonitoring::TDynamicCounters::TCounterPtr BytesWrittenCompressed;
+
+ ::NMonitoring::TDynamicCounters::TCounterPtr BytesInflightUncompressed;
+ ::NMonitoring::TDynamicCounters::TCounterPtr BytesInflightCompressed;
+ ::NMonitoring::TDynamicCounters::TCounterPtr BytesInflightTotal;
+ ::NMonitoring::TDynamicCounters::TCounterPtr MessagesInflight;
+
+ //! Histograms reporting % usage of memory limit in time.
+ //! Provides a histogram looking like: 10% : 100ms, 20%: 300ms, ... 50%: 200ms, ... 100%: 50ms
+ //! Which means that < 10% memory usage was observed for 100ms during the period and 50% usage was observed for 200ms
+ //! Used to monitor if the writer successfully deals with data flow provided. Larger values in higher buckets
+ //! mean that writer is close to overflow (or being overflown) for major periods of time
+ //! 3 histograms stand for:
+ //! Total memory usage:
+ ::NMonitoring::THistogramPtr TotalBytesInflightUsageByTime;
+ //! Memory usage by messages waiting for comression:
+ ::NMonitoring::THistogramPtr UncompressedBytesInflightUsageByTime;
+ //! Memory usage by compressed messages pending for write:
+ ::NMonitoring::THistogramPtr CompressedBytesInflightUsageByTime;
+};
+
struct TReaderCounters: public TThrRefBase {
using TSelf = TReaderCounters;
using TPtr = TIntrusivePtr<TSelf>;
@@ -485,7 +532,7 @@ public:
//! Unique identifier of partition session.
//! It is unique within one read session.
- i64 GetPartitionSessionId() const {
+ ui64 GetPartitionSessionId() const {
return PartitionSessionId;
}
@@ -495,14 +542,14 @@ public:
}
//! Partition id.
- i64 GetPartitionId() const {
+ ui64 GetPartitionId() const {
return PartitionId;
}
protected:
- i64 PartitionSessionId;
+ ui64 PartitionSessionId;
TString TopicPath;
- i64 PartitionId;
+ ui64 PartitionId;
};
//! Events for read session.
@@ -963,8 +1010,169 @@ IExecutor::TPtr CreateThreadPoolExecutorAdapter(
std::shared_ptr<IThreadPool> threadPool); // Thread pool is expected to have been started.
IExecutor::TPtr CreateThreadPoolExecutor(size_t threads);
+IExecutor::TPtr CreateSyncExecutor();
+
+//! Events for write session.
+struct TWriteSessionEvent {
+
+ //! Event with acknowledge for written messages.
+ struct TWriteAck {
+ //! Write result.
+ enum EEventState {
+ EES_WRITTEN, //! Successfully written.
+ EES_ALREADY_WRITTEN, //! Skipped on SeqNo deduplication.
+ EES_DISCARDED //! In case of destruction of writer or retry policy discarded future retries in this writer.
+ };
+ //! Details of successfully written message.
+ struct TWrittenMessageDetails {
+ ui64 Offset;
+ ui64 PartitionId;
+ };
+ //! Same SeqNo as provided on write.
+ ui64 SeqNo;
+ EEventState State;
+ //! Filled only for EES_WRITTEN. Empty for ALREADY and DISCARDED.
+ TMaybe<TWrittenMessageDetails> Details;
+ //! Write stats from server. See TWriteStat. nullptr for DISCARDED event.
+ TWriteStat::TPtr Stat;
+
+ };
+
+ struct TAcksEvent {
+ //! Acks could be batched from several WriteBatch/Write requests.
+ //! Acks for messages from one WriteBatch request could be emitted as several TAcksEvents -
+ //! they are provided to client as soon as possible.
+ TVector<TWriteAck> Acks;
+
+ TString DebugString() const;
+
+ };
+
+ //! Indicates that a writer is ready to accept new message(s).
+ //! Continuation token should be kept and then used in write methods.
+ struct TReadyToAcceptEvent {
+ TContinuationToken ContinuationToken;
+
+ TString DebugString() const;
+
+ };
+
+ using TEvent = std::variant<TAcksEvent, TReadyToAcceptEvent, TSessionClosedEvent>;
+};
+
+//! Event debug string.
+TString DebugString(const TWriteSessionEvent::TEvent& event);
+
using TSessionClosedHandler = std::function<void(const TSessionClosedEvent&)>;
+//! Settings for write session.
+struct TWriteSessionSettings : public TRequestSettings<TWriteSessionSettings> {
+ using TSelf = TWriteSessionSettings;
+
+ TWriteSessionSettings() = default;
+ TWriteSessionSettings(const TWriteSessionSettings&) = default;
+ TWriteSessionSettings(TWriteSessionSettings&&) = default;
+ TWriteSessionSettings(const TString& path, const TString& producerId, const TString& messageGroupId) {
+ Path(path);
+ ProducerId(producerId);
+ MessageGroupId(messageGroupId);
+ }
+
+ TWriteSessionSettings& operator=(const TWriteSessionSettings&) = default;
+ TWriteSessionSettings& operator=(TWriteSessionSettings&&) = default;
+
+ //! Path of topic to write.
+ FLUENT_SETTING(TString, Path);
+
+ //! ProducerId (aka SourceId) to use.
+ FLUENT_SETTING(TString, ProducerId);
+
+ //! MessageGroupId to use.
+ FLUENT_SETTING(TString, MessageGroupId);
+
+ //! Write to an exact partition. Generally server assigns partition automatically by message_group_id.
+ //! Using this option is not recommended unless you know for sure why you need it.
+ FLUENT_SETTING_OPTIONAL(ui32, PartitionId);
+
+ //! codec and level to use for data compression prior to write.
+ FLUENT_SETTING_DEFAULT(ECodec, Codec, ECodec::GZIP);
+ FLUENT_SETTING_DEFAULT(i32, CompressionLevel, 4);
+
+ //! Writer will not accept new messages if memory usage exceeds this limit.
+ //! Memory usage consists of raw data pending compression and compressed messages being sent.
+ FLUENT_SETTING_DEFAULT(ui64, MaxMemoryUsage, 20_MB);
+
+ //! Maximum messages accepted by writer but not written (with confirmation from server).
+ //! Writer will not accept new messages after reaching the limit.
+ FLUENT_SETTING_DEFAULT(ui32, MaxInflightCount, 100000);
+
+ //! Retry policy enables automatic retries for non-fatal errors.
+ //! IRetryPolicy::GetDefaultPolicy() if null (not set).
+ FLUENT_SETTING(IRetryPolicy::TPtr, RetryPolicy);
+
+ //! User metadata that may be attached to write session.
+ TWriteSessionSettings& AppendSessionMeta(const TString& key, const TString& value) {
+ Meta_.Fields[key] = value;
+ return *this;
+ };
+
+ TWriteSessionMeta Meta_;
+
+ //! Writer will accumulate messages until reaching up to BatchFlushSize bytes
+ //! but for no longer than BatchFlushInterval.
+ //! Upon reaching FlushInterval or FlushSize limit, all messages will be written with one batch.
+ //! Greatly increases performance for small messages.
+ //! Setting either value to zero means immediate write with no batching. (Unrecommended, especially for clients
+ //! sending small messages at high rate).
+ FLUENT_SETTING_OPTIONAL(TDuration, BatchFlushInterval);
+ FLUENT_SETTING_OPTIONAL(ui64, BatchFlushSizeBytes);
+
+ FLUENT_SETTING_DEFAULT(TDuration, ConnectTimeout, TDuration::Seconds(30));
+
+ FLUENT_SETTING_OPTIONAL(TWriterCounters::TPtr, Counters);
+
+ //! Executor for compression tasks.
+ //! If not set, default executor will be used.
+ FLUENT_SETTING(IExecutor::TPtr, CompressionExecutor);
+
+ struct TEventHandlers {
+ using TSelf = TEventHandlers;
+ using TWriteAckHandler = std::function<void(TWriteSessionEvent::TAcksEvent&)>;
+ using TReadyToAcceptHandler = std::function<void(TWriteSessionEvent::TReadyToAcceptEvent&)>;
+
+ //! Function to handle Acks events.
+ //! If this handler is set, write ack events will be handled by handler,
+ //! otherwise sent to TWriteSession::GetEvent().
+ FLUENT_SETTING(TWriteAckHandler, AcksHandler);
+
+ //! Function to handle ReadyToAccept event.
+ //! If this handler is set, write these events will be handled by handler,
+ //! otherwise sent to TWriteSession::GetEvent().
+ FLUENT_SETTING(TReadyToAcceptHandler, ReadyToAcceptHander);
+
+ //! Function to handle close session events.
+ //! If this handler is set, close session events will be handled by handler
+ //! and then sent to TWriteSession::GetEvent().
+ FLUENT_SETTING(TSessionClosedHandler, SessionClosedHandler);
+
+ //! Function to handle all event types.
+ //! If event with current type has no handler for this type of event,
+ //! this handler (if specified) will be used.
+ //! If this handler is not specified, event can be received with TReadSession::GetEvent() method.
+ FLUENT_SETTING(std::function<void(TReadSessionEvent::TEvent&)>, CommonHandler);
+
+ //! Executor for handlers.
+ //! If not set, default single threaded executor will be used.
+ FLUENT_SETTING(IExecutor::TPtr, HandlersExecutor);
+ };
+
+ //! Event handlers.
+ FLUENT_SETTING(TEventHandlers, EventHandlers);
+
+ //! Enables validation of SeqNo. If enabled, then writer will check writing with seqNo and without it and throws exception.
+ FLUENT_SETTING_DEFAULT(bool, ValidateSeqNo, true);
+};
+
//! Read settings for single topic.
struct TTopicReadSettings {
using TSelf = TTopicReadSettings;
@@ -1118,6 +1326,72 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> {
FLUENT_SETTING_OPTIONAL(TLog, Log);
};
+//! Simple write session. Does not need event handlers. Does not provide Events, ContinuationTokens, write Acks.
+class ISimpleBlockingWriteSession : public TThrRefBase {
+public:
+ //! Write single message. Blocks for up to blockTimeout if inflight is full or memoryUsage is exceeded;
+ //! return - true if write succeeded, false if message was not enqueued for write within blockTimeout.
+ //! no Ack is provided.
+ virtual bool Write(TStringBuf data, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing(),
+ const TDuration& blockTimeout = TDuration::Max()) = 0;
+
+ //! Blocks till SeqNo is discovered from server. Returns 0 in case of failure on init.
+ virtual ui64 GetInitSeqNo() = 0;
+
+ //! Complete all active writes, wait for ack from server and close.
+ //! closeTimeout - max time to wait. Empty Maybe means infinity.
+ //! return - true if all writes were completed and acked. false if timeout was reached and some writes were aborted.
+
+ virtual bool Close(TDuration closeTimeout = TDuration::Max()) = 0;
+
+ //! Returns true if write session is alive and acitve. False if session was closed.
+ virtual bool IsAlive() const = 0;
+
+ virtual TWriterCounters::TPtr GetCounters() = 0;
+
+ //! Close immediately and destroy, don't wait for anything.
+ virtual ~ISimpleBlockingWriteSession() = default;
+};
+
+//! Generic write session with all capabilities.
+class IWriteSession {
+public:
+ //! Future that is set when next event is available.
+ virtual NThreading::TFuture<void> WaitEvent() = 0;
+
+ //! Wait and return next event. Use WaitEvent() for non-blocking wait.
+ virtual TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false) = 0;
+
+ //! Get several events in one call.
+ //! If blocking = false, instantly returns up to maxEventsCount available events.
+ //! If blocking = true, blocks till maxEventsCount events are available.
+ //! If maxEventsCount is unset, write session decides the count to return itself.
+ virtual TVector<TWriteSessionEvent::TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing()) = 0;
+
+ //! Future that is set when initial SeqNo is available.
+ virtual NThreading::TFuture<ui64> GetInitSeqNo() = 0;
+
+ //! Write single message.
+ //! continuationToken - a token earlier provided to client with ReadyToAccept event.
+ virtual void Write(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) = 0;
+
+ //! Write single message that is already coded by codec. Codec from settings does not apply to this message.
+ //! continuationToken - a token earlier provided to client with ReadyToAccept event.
+ //! originalSize - size of unpacked message
+ virtual void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) = 0;
+
+
+ //! Wait for all writes to complete (no more that closeTimeout()), than close. Empty maybe - means infinite timeout.
+ //! return - true if all writes were completed and acked. false if timeout was reached and some writes were aborted.
+ virtual bool Close(TDuration closeTimeout = TDuration::Max()) = 0;
+
+ //! Writer counters with different stats (see TWriterConuters).
+ virtual TWriterCounters::TPtr GetCounters() = 0;
+
+ //! Close() with timeout = 0 and destroy everything instantly.
+ virtual ~IWriteSession() = default;
+};
+
class IReadSession {
public:
//! Main reader loop.
@@ -1190,6 +1464,10 @@ public:
//! Create read session.
std::shared_ptr<IReadSession> CreateReadSession(const TReadSessionSettings& settings);
+ //! Create write session.
+ std::shared_ptr<ISimpleBlockingWriteSession> CreateSimpleBlockingWriteSession(const TWriteSessionSettings& settings);
+ std::shared_ptr<IWriteSession> CreateWriteSession(const TWriteSessionSettings& settings);
+
private:
std::shared_ptr<TImpl> Impl_;
};
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt
index 2f5c02c216a..a950fc621c8 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt
@@ -28,7 +28,7 @@ target_link_libraries(ydb-public-sdk-cpp-client-ydb_topic-ut PUBLIC
cpp-client-ydb_persqueue_core
client-ydb_persqueue_core-impl
ydb_persqueue_core-ut-ut_utils
- client-ydb_persqueue_public-codecs
+ client-ydb_topic-codecs
client-ydb_topic-impl
)
target_link_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt
index 7e0aedf76ef..9a818366aae 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt
@@ -30,7 +30,7 @@ target_link_libraries(ydb-public-sdk-cpp-client-ydb_topic-ut PUBLIC
cpp-client-ydb_persqueue_core
client-ydb_persqueue_core-impl
ydb_persqueue_core-ut-ut_utils
- client-ydb_persqueue_public-codecs
+ client-ydb_topic-codecs
client-ydb_topic-impl
)
target_link_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE