diff options
author | ildar-khisam <ikhis@ydb.tech> | 2022-09-05 17:24:03 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2022-09-05 17:24:03 +0300 |
commit | f1ee295b388568dbcc56001aafe25ca1711ee49e (patch) | |
tree | 5802f4ab2235c2a189471f4ffcf778ce64f65cf8 | |
parent | 2a5400d4e3fc4b33e4217f4bff5ccdf452365d64 (diff) | |
download | ydb-f1ee295b388568dbcc56001aafe25ca1711ee49e.tar.gz |
topic writes in sdk
better
implement topic writes
18 files changed, 2165 insertions, 42 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index bc57fa2953c..649fc817e67 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -544,9 +544,9 @@ public: , Session(std::move(parentSession)) , ErrorHandler(std::move(errorHandler)) { - TAPartitionStream<false>::PartitionSessionId = static_cast<i64>(partitionStreamId); + TAPartitionStream<false>::PartitionSessionId = partitionStreamId; TAPartitionStream<false>::TopicPath = std::move(topicPath); - TAPartitionStream<false>::PartitionId = partitionId; + TAPartitionStream<false>::PartitionId = static_cast<ui64>(partitionId); MaxCommittedOffset = static_cast<ui64>(readOffset); } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index 4b606e03521..01a699a4129 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -22,10 +22,20 @@ #include <variant> +// Forward delcarations + namespace NYdb::NTopic { class TReadSession; } +namespace NYdb::NPersQueue::NCompressionDetails { + extern TString Decompress(const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& data); +} + +namespace NYdb::NTopic::NCompressionDetails { + extern TString Decompress(const Ydb::Topic::StreamReadMessage::ReadResponse::MessageData& data, Ydb::Topic::Codec codec); +} + namespace NYdb::NPersQueue { static const bool RangesMode = !GetEnv("PQ_OFFSET_RANGES_MODE").empty(); @@ -2179,12 +2189,6 @@ TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::TDecompression , Ready(ready) { } -// Forward delcaration -namespace NCompressionDetails { - extern TString Decompress(const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& data); - extern TString Decompress(const Ydb::Topic::StreamReadMessage::ReadResponse::MessageData& data, Ydb::Topic::Codec codec); -} - template<bool UseMigrationProtocol> void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator()() { i64 minOffset = Max<i64>(); @@ -2224,7 +2228,7 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator( && static_cast<Ydb::Topic::Codec>(batch.codec()) != Ydb::Topic::CODEC_RAW && static_cast<Ydb::Topic::Codec>(batch.codec()) != Ydb::Topic::CODEC_UNSPECIFIED ) { - TString decompressed = NCompressionDetails::Decompress(data, static_cast<Ydb::Topic::Codec>(batch.codec())); + TString decompressed = ::NYdb::NTopic::NCompressionDetails::Decompress(data, static_cast<Ydb::Topic::Codec>(batch.codec())); data.set_data(decompressed); } } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.cpp index 51c16f84f7a..bc2de2a1c50 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.cpp @@ -25,21 +25,6 @@ IInputStream* CreateDecompressorStream(TInputStreamVariant& inputStreamStorage, } } -IInputStream* CreateDecompressorStream(TInputStreamVariant& inputStreamStorage, Ydb::Topic::Codec codec, IInputStream* origin) { - switch (codec) { - case Ydb::Topic::CODEC_GZIP: - return &inputStreamStorage.emplace<TZLibDecompress>(origin); - case Ydb::Topic::CODEC_LZOP: - throw yexception() << "LZO codec is disabled"; - case Ydb::Topic::CODEC_ZSTD: - return &inputStreamStorage.emplace<TZstdDecompress>(origin); - default: - //case Ydb::Topic::CODEC_RAW: - //case Ydb::Topic::CODEC_UNSPECIFIED: - throw yexception() << "unsupported codec value : " << ui64(codec); - } -} - TString Decompress(const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& data) { TMemoryInput input(data.data().data(), data.data().size()); TString result; @@ -49,15 +34,6 @@ TString Decompress(const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage return result; } -TString Decompress(const Ydb::Topic::StreamReadMessage::ReadResponse::MessageData& data, Ydb::Topic::Codec codec) { - TMemoryInput input(data.data().data(), data.data().size()); - TString result; - TStringOutput resultOutput(result); - TInputStreamVariant inputStreamStorage; - TransferData(CreateDecompressorStream(inputStreamStorage, codec, &input), &resultOutput); - return result; -} - class TZLibToStringCompressor: private TEmbedPolicy<TBufferOutput>, public TZLibCompress { public: TZLibToStringCompressor(TBuffer& dst, ZLib::StreamType type, size_t quality) diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.h b/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.h index b18d14119e1..5b6b111c1e6 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.h @@ -9,7 +9,6 @@ namespace NYdb::NPersQueue { namespace NCompressionDetails { extern TString Decompress(const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& data); -extern TString Decompress(const Ydb::Topic::StreamReadMessage::ReadResponse::MessageData& data, Ydb::Topic::Codec codec); THolder<IOutputStream> CreateCoder(ECodec codec, TBuffer& result, int quality); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt index 6fc60fb0c0b..24274dcbeef 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(codecs) add_subdirectory(impl) add_subdirectory(ut) @@ -14,11 +15,14 @@ target_link_libraries(cpp-client-ydb_topic PUBLIC contrib-libs-cxxsupp yutil tools-enum_parser-enum_serialization_runtime - client-ydb_persqueue_public-codecs + client-ydb_topic-codecs library-cpp-retry client-ydb_topic-impl cpp-client-ydb_proto cpp-client-ydb_driver + api-grpc + api-grpc-draft + api-protos ) target_sources(cpp-client-ydb_topic PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp diff --git a/ydb/public/sdk/cpp/client/ydb_topic/codecs/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_topic/codecs/CMakeLists.txt new file mode 100644 index 00000000000..7bbe8147995 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/codecs/CMakeLists.txt @@ -0,0 +1,22 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(client-ydb_topic-codecs) +target_link_libraries(client-ydb_topic-codecs PUBLIC + contrib-libs-cxxsupp + yutil + cpp-streams-zstd + public-issue-protos + api-grpc-draft + api-grpc + api-protos +) +target_sources(client-ydb_topic-codecs PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/codecs/codecs.cpp +) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/codecs/codecs.cpp b/ydb/public/sdk/cpp/client/ydb_topic/codecs/codecs.cpp new file mode 100644 index 00000000000..44252130fda --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/codecs/codecs.cpp @@ -0,0 +1,71 @@ +#include <library/cpp/streams/zstd/zstd.h> +#include <util/stream/buffer.h> +#include <util/stream/zlib.h> +#include <util/stream/mem.h> + +#include "codecs.h" + +namespace NYdb::NTopic { +namespace NCompressionDetails { + +using TInputStreamVariant = std::variant<std::monostate, TZLibDecompress, TZstdDecompress>; + +IInputStream* CreateDecompressorStream(TInputStreamVariant& inputStreamStorage, Ydb::Topic::Codec codec, IInputStream* origin) { + switch (codec) { + case Ydb::Topic::CODEC_GZIP: + return &inputStreamStorage.emplace<TZLibDecompress>(origin); + case Ydb::Topic::CODEC_LZOP: + throw yexception() << "LZO codec is disabled"; + case Ydb::Topic::CODEC_ZSTD: + return &inputStreamStorage.emplace<TZstdDecompress>(origin); + default: + //case Ydb::Topic::CODEC_RAW: + //case Ydb::Topic::CODEC_UNSPECIFIED: + throw yexception() << "unsupported codec value : " << ui64(codec); + } +} + +TString Decompress(const Ydb::Topic::StreamReadMessage::ReadResponse::MessageData& data, Ydb::Topic::Codec codec) { + TMemoryInput input(data.data().data(), data.data().size()); + TString result; + TStringOutput resultOutput(result); + TInputStreamVariant inputStreamStorage; + TransferData(CreateDecompressorStream(inputStreamStorage, codec, &input), &resultOutput); + return result; +} + +class TZLibToStringCompressor: private TEmbedPolicy<TBufferOutput>, public TZLibCompress { +public: + TZLibToStringCompressor(TBuffer& dst, ZLib::StreamType type, size_t quality) + : TEmbedPolicy<TBufferOutput>(dst) + , TZLibCompress(TEmbedPolicy::Ptr(), type, quality) + { + } +}; + +class TZstdToStringCompressor: private TEmbedPolicy<TBufferOutput>, public TZstdCompress { +public: + TZstdToStringCompressor(TBuffer& dst, int quality) + : TEmbedPolicy<TBufferOutput>(dst) + , TZstdCompress(TEmbedPolicy::Ptr(), quality) + { + } +}; + +THolder<IOutputStream> CreateCoder(ECodec codec, TBuffer& result, int quality) { + switch (codec) { + case ECodec::GZIP: + return MakeHolder<TZLibToStringCompressor>(result, ZLib::GZip, quality >= 0 ? quality : 6); + case ECodec::LZOP: + throw yexception() << "LZO codec is disabled"; + case ECodec::ZSTD: + return MakeHolder<TZstdToStringCompressor>(result, quality); + default: + Y_FAIL("NOT IMPLEMENTED CODEC TYPE"); + } +} + + +} // namespace NDecompressionDetails + +} // namespace NYdb::NTopic diff --git a/ydb/public/sdk/cpp/client/ydb_topic/codecs/codecs.h b/ydb/public/sdk/cpp/client/ydb_topic/codecs/codecs.h new file mode 100644 index 00000000000..79047730b89 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/codecs/codecs.h @@ -0,0 +1,16 @@ +#pragma once +#include <util/stream/output.h> +#include <ydb/public/api/protos/ydb_topic.pb.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + + +namespace NYdb::NTopic { +namespace NCompressionDetails { + +extern TString Decompress(const Ydb::Topic::StreamReadMessage::ReadResponse::MessageData& data, Ydb::Topic::Codec codec); + +THolder<IOutputStream> CreateCoder(ECodec codec, TBuffer& result, int quality); + +} // namespace NDecompressionDetails + +} // namespace NYdb::NTopic diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt index e138e1085f5..8e8d6bdb145 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt @@ -13,9 +13,11 @@ target_link_libraries(client-ydb_topic-impl PUBLIC yutil cpp-grpc-client cpp-monlib-dynamic_counters + cpp-monlib-metrics cpp-string_utils-url library-persqueue-obfuscate api-grpc-draft + api-grpc impl-ydb_internal-make_request client-ydb_common_client-impl cpp-client-ydb_driver @@ -29,6 +31,7 @@ target_sources(client-ydb_topic-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/deferred_commit.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/event_handlers.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp ) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/executor.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/executor.cpp index 368df898ac6..18bfccf8a49 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/executor.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/executor.cpp @@ -88,4 +88,9 @@ TThreadPoolExecutor::TThreadPoolExecutor(size_t threadsCount) ThreadsCount = threadsCount; } +IExecutor::TPtr CreateSyncExecutor() +{ + return MakeIntrusive<TSyncExecutor>(); +} + } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp index c82668f104d..f6abfca8226 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp @@ -234,4 +234,12 @@ std::shared_ptr<IReadSession> TTopicClient::CreateReadSession(const TReadSession return Impl_->CreateReadSession(settings); } +std::shared_ptr<ISimpleBlockingWriteSession> TTopicClient::CreateSimpleBlockingWriteSession(const TWriteSessionSettings& settings) { + return Impl_->CreateSimpleWriteSession(settings); +} + +std::shared_ptr<IWriteSession> TTopicClient::CreateWriteSession(const TWriteSessionSettings& settings) { + return Impl_->CreateWriteSession(settings); +} + } // namespace NYdb::NTopic diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp index b18aab25301..7233e9b945b 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp @@ -4,6 +4,7 @@ #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h> #include "read_session.h" +#include "write_session.h" namespace NYdb::NTopic { @@ -23,7 +24,45 @@ std::shared_ptr<IReadSession> TTopicClient::TImpl::CreateReadSession(const TRead auto session = std::make_shared<TReadSession>(maybeSettings.GetOrElse(settings), shared_from_this(), Connections_, DbDriverState_); session->Start(); return std::move(session); - // return std::make_shared<TDummyReadSession>(settings); +} + +std::shared_ptr<IWriteSession> TTopicClient::TImpl::CreateWriteSession( + const TWriteSessionSettings& settings +) { + TMaybe<TWriteSessionSettings> maybeSettings; + if (!settings.CompressionExecutor_ || !settings.EventHandlers_.HandlersExecutor_) { + maybeSettings = settings; + with_lock (Lock) { + if (!settings.CompressionExecutor_) { + maybeSettings->CompressionExecutor(Settings.DefaultCompressionExecutor_); + } + if (!settings.EventHandlers_.HandlersExecutor_) { + maybeSettings->EventHandlers_.HandlersExecutor(Settings.DefaultHandlersExecutor_); + } + } + } + auto session = std::make_shared<TWriteSession>( + maybeSettings.GetOrElse(settings), shared_from_this(), Connections_, DbDriverState_ + ); + session->Start(TDuration::Zero()); + return std::move(session); +} + +std::shared_ptr<ISimpleBlockingWriteSession> TTopicClient::TImpl::CreateSimpleWriteSession( + const TWriteSessionSettings& settings +) { + auto alteredSettings = settings; + with_lock (Lock) { + alteredSettings.EventHandlers_.HandlersExecutor(Settings.DefaultHandlersExecutor_); + if (!settings.CompressionExecutor_) { + alteredSettings.CompressionExecutor(Settings.DefaultCompressionExecutor_); + } + } + + auto session = std::make_shared<TSimpleBlockingWriteSession>( + alteredSettings, shared_from_this(), Connections_, DbDriverState_ + ); + return std::move(session); } std::shared_ptr<TTopicClient::TImpl::IReadSessionConnectionProcessorFactory> TTopicClient::TImpl::CreateReadSessionConnectionProcessorFactory() { @@ -33,4 +72,11 @@ std::shared_ptr<TTopicClient::TImpl::IReadSessionConnectionProcessorFactory> TTo return NPersQueue::CreateConnectionProcessorFactory<TService, TRequest, TResponse>(&TService::Stub::AsyncStreamRead, Connections_, DbDriverState_); } +std::shared_ptr<TTopicClient::TImpl::IWriteSessionConnectionProcessorFactory> TTopicClient::TImpl::CreateWriteSessionConnectionProcessorFactory() { + using TService = Ydb::Topic::V1::TopicService; + using TRequest = Ydb::Topic::StreamWriteMessage::FromClient; + using TResponse = Ydb::Topic::StreamWriteMessage::FromServer; + return NPersQueue::CreateConnectionProcessorFactory<TService, TRequest, TResponse>(&TService::Stub::AsyncStreamWrite, Connections_, DbDriverState_); +} + } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h index 15589ab1601..f24800c9331 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h @@ -201,6 +201,8 @@ public: // Runtime API. std::shared_ptr<IReadSession> CreateReadSession(const TReadSessionSettings& settings); + std::shared_ptr<ISimpleBlockingWriteSession> CreateSimpleWriteSession(const TWriteSessionSettings& settings); + std::shared_ptr<IWriteSession> CreateWriteSession(const TWriteSessionSettings& settings); using IReadSessionConnectionProcessorFactory = NYdb::NPersQueue::ISessionConnectionProcessorFactory<Ydb::Topic::StreamReadMessage::FromClient, @@ -208,6 +210,12 @@ public: std::shared_ptr<IReadSessionConnectionProcessorFactory> CreateReadSessionConnectionProcessorFactory(); + using IWriteSessionConnectionProcessorFactory = + NYdb::NPersQueue::ISessionConnectionProcessorFactory<Ydb::Topic::StreamWriteMessage::FromClient, + Ydb::Topic::StreamWriteMessage::FromServer>; + + std::shared_ptr<IWriteSessionConnectionProcessorFactory> CreateWriteSessionConnectionProcessorFactory(); + NGrpc::IQueueClientContextPtr CreateContext() { return Connections_->CreateContext(); } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp new file mode 100644 index 00000000000..83103a1ec98 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp @@ -0,0 +1,1219 @@ +#include "write_session.h" +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> +#include <library/cpp/string_utils/url/url.h> + +#include <google/protobuf/util/time_util.h> + +#include <util/generic/store_policy.h> +#include <util/generic/utility.h> +#include <util/stream/buffer.h> + + +namespace NYdb::NTopic { +using ::NMonitoring::TDynamicCounterPtr; +using TCounterPtr = ::NMonitoring::TDynamicCounters::TCounterPtr; + + +const TDuration UPDATE_TOKEN_PERIOD = TDuration::Hours(1); + +namespace NCompressionDetails { + THolder<IOutputStream> CreateCoder(ECodec codec, TBuffer& result, int quality); +} + +#define HISTOGRAM_SETUP ::NMonitoring::ExplicitHistogram({0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}) +TWriterCounters::TWriterCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters) { + Errors = counters->GetCounter("errors", true); + CurrentSessionLifetimeMs = counters->GetCounter("currentSessionLifetimeMs", false); + BytesWritten = counters->GetCounter("bytesWritten", true); + MessagesWritten = counters->GetCounter("messagesWritten", true); + BytesWrittenCompressed = counters->GetCounter("bytesWrittenCompressed", true); + BytesInflightUncompressed = counters->GetCounter("bytesInflightUncompressed", false); + BytesInflightCompressed = counters->GetCounter("bytesInflightCompressed", false); + BytesInflightTotal = counters->GetCounter("bytesInflightTotal", false); + MessagesInflight = counters->GetCounter("messagesInflight", false); + + TotalBytesInflightUsageByTime = counters->GetHistogram("totalBytesInflightUsageByTime", HISTOGRAM_SETUP); + UncompressedBytesInflightUsageByTime = counters->GetHistogram("uncompressedBytesInflightUsageByTime", HISTOGRAM_SETUP); + CompressedBytesInflightUsageByTime = counters->GetHistogram("compressedBytesInflightUsageByTime", HISTOGRAM_SETUP); +} +#undef HISTOGRAM_SETUP + +TWriteSession::TWriteSession( + const TWriteSessionSettings& settings, + std::shared_ptr<TTopicClient::TImpl> client, + std::shared_ptr<TGRpcConnectionsImpl> connections, + TDbDriverStatePtr dbDriverState) + : Settings(settings) + , Client(std::move(client)) + , Connections(std::move(connections)) + , DbDriverState(std::move(dbDriverState)) + , PrevToken(DbDriverState->CredentialsProvider ? DbDriverState->CredentialsProvider->GetAuthInfo() : "") + , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings)) + , InitSeqNoPromise(NThreading::NewPromise<ui64>()) + , WakeupInterval( + Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) ? + std::min(Settings.BatchFlushInterval_.GetOrElse(TDuration::Seconds(1)) / 5, TDuration::MilliSeconds(100)) + : + TDuration::MilliSeconds(100) + ) +{ + if (!Settings.RetryPolicy_) { + Settings.RetryPolicy_ = IRetryPolicy::GetDefaultPolicy(); + } + if (Settings.Counters_.Defined()) { + Counters = *Settings.Counters_; + } else { + Counters = MakeIntrusive<TWriterCounters>(new ::NMonitoring::TDynamicCounters()); + } + +} + +void TWriteSession::Start(const TDuration& delay) { + ++ConnectionAttemptsDone; + if (!Started) { + HandleWakeUpImpl(); + InitWriter(); + } + Started = true; + + DoConnect(delay, DbDriverState->DiscoveryEndpoint); +} + +// Only called under lock +TWriteSession::THandleResult TWriteSession::RestartImpl(const TPlainStatus& status) { + THandleResult result; + if (AtomicGet(Aborting)) { + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart"); + return result; + } + DbDriverState->Log.Write( + TLOG_INFO, + LogPrefix() << "Got error. Status: " << status.Status + << ". Description: " << NPersQueue::IssuesSingleLineString(status.Issues) + ); + SessionEstablished = false; + TMaybe<TDuration> nextDelay = TDuration::Zero(); + if (!RetryState) { + RetryState = Settings.RetryPolicy_->CreateRetryState(); + } + nextDelay = RetryState->GetNextRetryDelay(status.Status); + + if (nextDelay) { + result.StartDelay = *nextDelay; + result.DoRestart = true; + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Write session will restart in " << result.StartDelay.MilliSeconds() << " ms" + ); + ResetForRetryImpl(); + + } else { + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error"); + result.DoStop = true; + CheckHandleResultImpl(result); + } + return result; +} + +void TWriteSession::InitWriter() { // No Lock, very initial start - no race yet as well. + CompressionExecutor = Settings.CompressionExecutor_; + IExecutor::TPtr executor; + executor = CreateSyncExecutor(); + executor->Start(); + Executor = std::move(executor); + + Settings.CompressionExecutor_->Start(); + Settings.EventHandlers_.HandlersExecutor_->Start(); + +} +// Client method +NThreading::TFuture<ui64> TWriteSession::GetInitSeqNo() { + if (Settings.ValidateSeqNo_) { + if (AutoSeqNoMode.Defined() && *AutoSeqNoMode) { + DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode"); + ThrowFatalError("Cannot call GetInitSeqNo in Auto SeqNo mode"); + } + else + AutoSeqNoMode = false; + } + return InitSeqNoPromise.GetFuture(); +} + +TString DebugString(const TWriteSessionEvent::TEvent& event) { + return std::visit([](const auto& ev) { return ev.DebugString(); }, event); +} + +// Client method +TMaybe<TWriteSessionEvent::TEvent> TWriteSession::GetEvent(bool block) { + return EventsQueue->GetEvent(block); +} + +// Client method +TVector<TWriteSessionEvent::TEvent> TWriteSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount) { + return EventsQueue->GetEvents(block, maxEventsCount); +} + +// Only called under lock +ui64 TWriteSession::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) { + ui64 seqNoValue = LastSeqNo + 1; + if (!AutoSeqNoMode.Defined()) { + AutoSeqNoMode = !seqNo.Defined(); + //! Disable SeqNo shift for manual SeqNo mode; + if (seqNo.Defined()) { + OnSeqNoShift = false; + SeqNoShift = 0; + } + } + if (seqNo.Defined()) { + if (*AutoSeqNoMode) { + DbDriverState->Log.Write( + TLOG_ERR, + LogPrefix() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" + ); + ThrowFatalError( + "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" + ); + } else { + seqNoValue = *seqNo; + } + } else if (!(*AutoSeqNoMode)) { + DbDriverState->Log.Write( + TLOG_ERR, + LogPrefix() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" + ); + ThrowFatalError( + "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" + ); + } + LastSeqNo = seqNoValue; + return seqNoValue; +} +inline void TWriteSession::CheckHandleResultImpl(THandleResult& result) { + result.DoSetSeqNo = result.DoStop && !InitSeqNoSetDone && (InitSeqNoSetDone = true); +} + +void TWriteSession::ProcessHandleResult(THandleResult& result) { + if (result.DoRestart) { + Start(result.StartDelay); + } else if (result.DoSetSeqNo) { + InitSeqNoPromise.SetException("session closed"); + } +} + +NThreading::TFuture<void> TWriteSession::WaitEvent() { + return EventsQueue->WaitEvent(); +} + +// Client method. +void TWriteSession::WriteInternal( + TContinuationToken&&, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp + ) { + TInstant createdAtValue = createTimestamp.Defined() ? *createTimestamp : TInstant::Now(); + bool readyToAccept = false; + size_t bufferSize = data.size(); + with_lock(Lock) { + CurrentBatch.Add(GetNextSeqNoImpl(seqNo), createdAtValue, data, codec, originalSize); + + FlushWriteIfRequiredImpl(); + readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk; + } + if (readyToAccept) { + EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + } +} + +// Client method. +void TWriteSession::WriteEncoded( + TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp + ) { + WriteInternal(std::move(token), data, codec, originalSize, seqNo, createTimestamp); +} + +void TWriteSession::Write( + TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp + ) { + WriteInternal(std::move(token), data, {}, 0, seqNo, createTimestamp); +} + + +// Only called under lock. +TWriteSession::THandleResult TWriteSession::OnErrorImpl(NYdb::TPlainStatus&& status) { + (*Counters->Errors)++; + auto result = RestartImpl(status); + if (result.DoStop) { + CloseImpl(status.Status, std::move(status.Issues)); + } + return result; +} + +// No lock +void TWriteSession::DoConnect(const TDuration& delay, const TString& endpoint) { + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint); + + NGrpc::IQueueClientContextPtr prevConnectContext; + NGrpc::IQueueClientContextPtr prevConnectTimeoutContext; + NGrpc::IQueueClientContextPtr prevConnectDelayContext; + NGrpc::IQueueClientContextPtr connectContext = nullptr; + NGrpc::IQueueClientContextPtr connectDelayContext = nullptr; + NGrpc::IQueueClientContextPtr connectTimeoutContext = nullptr; + TRpcRequestSettings reqSettings; + std::shared_ptr<IWriteSessionConnectionProcessorFactory> connectionFactory; + with_lock(Lock) { + ++ConnectionGeneration; + auto subclient = Client; + connectionFactory = subclient->CreateWriteSessionConnectionProcessorFactory(); + auto clientContext = subclient->CreateContext(); + ConnectionFactory = connectionFactory; + + ClientContext = std::move(clientContext); + ServerMessage = std::make_shared<TServerMessage>(); + + if (!ClientContext) { + AbortImpl(); + // Grpc and WriteSession is closing right now. + return; + } + + connectContext = ClientContext->CreateContext(); + if (delay) + connectDelayContext = ClientContext->CreateContext(); + connectTimeoutContext = ClientContext->CreateContext(); + + // Previous operations contexts. + + // Set new context + prevConnectContext = std::exchange(ConnectContext, connectContext); + prevConnectTimeoutContext = std::exchange(ConnectTimeoutContext, connectTimeoutContext); + prevConnectDelayContext = std::exchange(ConnectDelayContext, connectDelayContext); + Y_ASSERT(ConnectContext); + Y_ASSERT(ConnectTimeoutContext); + + // Cancel previous operations. + NPersQueue::Cancel(prevConnectContext); + if (prevConnectDelayContext) + NPersQueue::Cancel(prevConnectDelayContext); + NPersQueue::Cancel(prevConnectTimeoutContext); + Y_ASSERT(connectContext); + Y_ASSERT(connectTimeoutContext); + reqSettings = TRpcRequestSettings::Make(Settings); + } + auto connectCallback = [weakThis = weak_from_this(), connectContext = connectContext] + (TPlainStatus&& st, typename IProcessor::TPtr&& processor) { + if (auto sharedThis = weakThis.lock()) { + sharedThis->OnConnect(std::move(st), std::move(processor), connectContext); + } + }; + + auto connectTimeoutCallback = [weakThis = weak_from_this(), connectTimeoutContext = connectTimeoutContext] + (bool ok) { + if (ok) { + if (auto sharedThis = weakThis.lock()) { + sharedThis->OnConnectTimeout(connectTimeoutContext); + } + } + }; + + connectionFactory->CreateProcessor( + std::move(connectCallback), + reqSettings, + std::move(connectContext), + TDuration::Seconds(30) /* connect timeout */, // TODO: make connect timeout setting. + std::move(connectTimeoutContext), + std::move(connectTimeoutCallback), + delay, + std::move(connectDelayContext) + ); +} + +// RPC callback. +void TWriteSession::OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext) { + DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Write session: connect timeout"); + THandleResult handleResult; + with_lock (Lock) { + if (ConnectTimeoutContext == connectTimeoutContext) { + NPersQueue::Cancel(ConnectContext); + ConnectContext = nullptr; + ConnectTimeoutContext = nullptr; + ConnectDelayContext = nullptr; + } else { + return; + } + TStringBuilder description; + description << "Failed to establish connection to server. Attempts done: " << ConnectionAttemptsDone; + handleResult = RestartImpl(TPlainStatus(EStatus::TIMEOUT, description)); + if (handleResult.DoStop) { + CloseImpl( + EStatus::TIMEOUT, + description + ); + } + } + ProcessHandleResult(handleResult); +} + +// RPC callback. +void TWriteSession::OnConnect( + TPlainStatus&& st, typename IProcessor::TPtr&& processor, const NGrpc::IQueueClientContextPtr& connectContext +) { + THandleResult handleResult; + with_lock (Lock) { + if (ConnectContext == connectContext) { + NPersQueue::Cancel(ConnectTimeoutContext); + ConnectContext = nullptr; + ConnectTimeoutContext = nullptr; + ConnectDelayContext = nullptr; + + if (st.Ok()) { + Processor = std::move(processor); + InitImpl(); + // Still should call ReadFromProcessor(); + } + } else { + return; + } + if (!st.Ok()) { + handleResult = RestartImpl(st); + if (handleResult.DoStop) { + CloseImpl( + st.Status, + NPersQueue::MakeIssueWithSubIssues( + TStringBuilder() << "Failed to establish connection to server \"" << st.Endpoint + << "\". Attempts done: " << ConnectionAttemptsDone, + st.Issues + ) + ); + } + } + } + if (st.Ok()) + ReadFromProcessor(); // Out of Init + ProcessHandleResult(handleResult); +} + +// Produce init request for session. +// Only called under lock. +void TWriteSession::InitImpl() { + TClientMessage req; + auto* init = req.mutable_init_request(); + init->set_path(Settings.Path_); + init->set_producer_id(Settings.ProducerId_); + init->set_message_group_id(Settings.MessageGroupId_); + + for (const auto& attr : Settings.Meta_.Fields) { + (*init->mutable_write_session_meta())[attr.first] = attr.second; + } + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString()); + WriteToProcessorImpl(std::move(req)); +} + +// Called under lock. Invokes Processor->Write, which is assumed to be deadlock-safe +void TWriteSession::WriteToProcessorImpl(TWriteSession::TClientMessage&& req) { + Y_ASSERT(Processor); + if (Aborting) + return; + auto callback = [weakThis = weak_from_this(), connectionGeneration = ConnectionGeneration](NGrpc::TGrpcStatus&& grpcStatus) { + if (auto sharedThis = weakThis.lock()) { + sharedThis->OnWriteDone(std::move(grpcStatus), connectionGeneration); + } + }; + + Processor->Write(std::move(req), callback); +} + +void TWriteSession::ReadFromProcessor() { + Y_ASSERT(Processor); + IProcessor::TPtr prc; + ui64 generation; + with_lock(Lock) { + prc = Processor; + generation = ConnectionGeneration; + } + auto callback = [weakThis = weak_from_this(), connectionGeneration = generation, processor = prc, serverMessage = ServerMessage] + (NGrpc::TGrpcStatus&& grpcStatus) { + if (auto sharedThis = weakThis.lock()) { + sharedThis->OnReadDone(std::move(grpcStatus), connectionGeneration); + } + }; + prc->Read(ServerMessage.get(), std::move(callback)); +} + +void TWriteSession::OnWriteDone(NGrpc::TGrpcStatus&& status, size_t connectionGeneration) { + THandleResult handleResult; + with_lock (Lock) { + if (connectionGeneration != ConnectionGeneration) { + return; // Message from previous connection. Ignore. + } + if (Aborting) { + return; + } + if(!status.Ok()) { + handleResult = OnErrorImpl(status); + } + } + ProcessHandleResult(handleResult); +} + +void TWriteSession::OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration) { + TPlainStatus errorStatus; + TProcessSrvMessageResult processResult; + bool needSetValue = false; + if (!grpcStatus.Ok()) { + errorStatus = TPlainStatus(std::move(grpcStatus)); + } + bool doRead = false; + with_lock (Lock) { + UpdateTimedCountersImpl(); + if (connectionGeneration != ConnectionGeneration) { + return; // Message from previous connection. Ignore. + } + if (errorStatus.Ok()) { + if (NPersQueue::IsErrorMessage(*ServerMessage)) { + errorStatus = NPersQueue::MakeErrorFromProto(*ServerMessage); + } else { + processResult = ProcessServerMessageImpl(); + needSetValue = !InitSeqNoSetDone && processResult.InitSeqNo.Defined() && (InitSeqNoSetDone = true); + if (errorStatus.Ok() && processResult.Ok) { + doRead = true; + } + } + } + } + if (doRead) + ReadFromProcessor(); + + with_lock(Lock) { + if (!errorStatus.Ok()) { + if (processResult.Ok) { // Otherwise, OnError was already called + processResult.HandleResult = RestartImpl(errorStatus); + } + } + if (processResult.HandleResult.DoStop) { + CloseImpl(std::move(errorStatus)); + } + } + for (auto& event : processResult.Events) { + EventsQueue->PushEvent(std::move(event)); + } + if (needSetValue) { + InitSeqNoPromise.SetValue(*processResult.InitSeqNo); + processResult.HandleResult.DoSetSeqNo = false; // Redundant. Just in case. + } + ProcessHandleResult(processResult.HandleResult); +} + +TStringBuilder TWriteSession::LogPrefix() const { + return TStringBuilder() << "ProducerId [" << Settings.ProducerId_ << "] MessageGroupId [" << Settings.MessageGroupId_ << "] SessionId [" << SessionId << "] "; +} + +TString TWriteSessionEvent::TAcksEvent::DebugString() const { + TStringBuilder res; + res << "AcksEvent:"; + for (auto& ack : Acks) { + res << " { seqNo : " << ack.SeqNo << ", State : " << ack.State; + if (ack.Details) { + res << ", offset : " << ack.Details->Offset << ", partitionId : " << ack.Details->PartitionId; + } + res << " }"; + } + if (!Acks.empty() && Acks.back().Stat) { + auto& stat = Acks.back().Stat; + res << " write stat: Write time " << stat->WriteTime + << " minimal time in partition queue " << stat->MinTimeInPartitionQueue + << " maximal time in partition queue " << stat->MaxTimeInPartitionQueue + << " partition quoted time " << stat->PartitionQuotedTime + << " topic quoted time " << stat->TopicQuotedTime; + } + return res; +} + +TString TWriteSessionEvent::TReadyToAcceptEvent::DebugString() const { + return "ReadyToAcceptEvent"; +} + + +TWriteSession::TProcessSrvMessageResult TWriteSession::ProcessServerMessageImpl() { + TProcessSrvMessageResult result; + switch (ServerMessage->GetServerMessageCase()) { + case TServerMessage::SERVER_MESSAGE_NOT_SET: { + SessionEstablished = false; + result.HandleResult = OnErrorImpl({ + static_cast<NYdb::EStatus>(ServerMessage->status()), + {NYql::TIssue{ServerMessage->DebugString()}} + }); + result.Ok = false; + break; + } + case TServerMessage::kInitResponse: { + const auto& initResponse = ServerMessage->init_response(); + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString()); + SessionId = initResponse.session_id(); + PartitionId = initResponse.partition_id(); + ui64 newLastSeqNo = initResponse.last_seq_no(); + // SeqNo increased, so there's a risk of loss, apply SeqNo shift. + // MinUnsentSeqNo must be > 0 if anything was ever sent yet + if (MinUnsentSeqNo && OnSeqNoShift && newLastSeqNo > MinUnsentSeqNo) { + SeqNoShift = newLastSeqNo - MinUnsentSeqNo; + } + result.InitSeqNo = newLastSeqNo; + LastSeqNo = newLastSeqNo; + + SessionEstablished = true; + LastCountersUpdateTs = TInstant::Now(); + SessionStartedTs = TInstant::Now(); + OnErrorResolved(); + + //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + // Kickstart send after session reestablishment + SendImpl(); + break; + } + case TServerMessage::kWriteResponse: { + TWriteSessionEvent::TAcksEvent acksEvent; + const auto& batchWriteResponse = ServerMessage->write_response(); + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Write session got write response: " << batchWriteResponse.ShortDebugString() + ); + TWriteStat::TPtr writeStat = new TWriteStat{}; + const auto& stat = batchWriteResponse.write_statistics(); + + auto durationConv = [](const ::google::protobuf::Duration& dur) { + return TDuration::MilliSeconds(::google::protobuf::util::TimeUtil::DurationToMilliseconds(dur)); + }; + + writeStat->WriteTime = durationConv(stat.persisting_time()); + writeStat->MinTimeInPartitionQueue = durationConv(stat.min_queue_wait_time()); + writeStat->MaxTimeInPartitionQueue = durationConv(stat.max_queue_wait_time()); + writeStat->PartitionQuotedTime = durationConv(stat.partition_quota_wait_time()); + writeStat->TopicQuotedTime = durationConv(stat.topic_quota_wait_time()); + + for (size_t messageIndex = 0, endIndex = batchWriteResponse.acks_size(); messageIndex != endIndex; ++messageIndex) { + // TODO: Fill writer statistics + auto ack = batchWriteResponse.acks(messageIndex); + ui64 sequenceNumber = ack.seq_no(); + + Y_VERIFY(ack.has_written() || ack.has_skipped()); + auto msgWriteStatus = ack.has_written() + ? TWriteSessionEvent::TWriteAck::EES_WRITTEN + : (ack.skipped().reason() == Ydb::Topic::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason_REASON_ALREADY_WRITTEN + ? TWriteSessionEvent::TWriteAck::EES_ALREADY_WRITTEN + : TWriteSessionEvent::TWriteAck::EES_DISCARDED); + + ui64 offset = ack.has_written() ? ack.written().offset() : 0; + + acksEvent.Acks.push_back(TWriteSessionEvent::TWriteAck{ + sequenceNumber - SeqNoShift, + msgWriteStatus, + TWriteSessionEvent::TWriteAck::TWrittenMessageDetails { + offset, + PartitionId, + }, + writeStat, + }); + + if (CleanupOnAcknowledged(sequenceNumber - SeqNoShift)) { + result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + } + } + //EventsQueue->PushEvent(std::move(acksEvent)); + result.Events.emplace_back(std::move(acksEvent)); + break; + } + case TServerMessage::kUpdateTokenResponse: { + UpdateTokenInProgress = false; + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully"); + UpdateTokenIfNeededImpl(); + break; + } + } + return result; +} + +bool TWriteSession::CleanupOnAcknowledged(ui64 sequenceNumber) { + bool result = false; + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber); + UpdateTimedCountersImpl(); + const auto& sentFront = SentOriginalMessages.front(); + ui64 size = 0; + ui64 compressedSize = 0; + if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == sequenceNumber) { + auto memoryUsage = OnMemoryUsageChangedImpl(-SentPackedMessage.front().Data.size()); + result = memoryUsage.NowOk && !memoryUsage.WasOk; + //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + const auto& front = SentPackedMessage.front(); + if (front.Compressed) { + compressedSize = front.Data.size(); + } else { + size = front.Data.size(); + } + + (*Counters->MessagesWritten) += front.MessageCount; + (*Counters->MessagesInflight) -= front.MessageCount; + (*Counters->BytesWritten) += front.OriginalSize; + + SentPackedMessage.pop(); + } else { + size = sentFront.Size; + (*Counters->BytesWritten) += sentFront.Size; + (*Counters->MessagesWritten)++; + (*Counters->MessagesInflight)--; + } + + (*Counters->BytesInflightCompressed) -= compressedSize; + (*Counters->BytesWrittenCompressed) += compressedSize; + (*Counters->BytesInflightUncompressed) -= size; + + Y_VERIFY(Counters->BytesInflightCompressed->Val() >= 0); + Y_VERIFY(Counters->BytesInflightUncompressed->Val() >= 0); + + Y_VERIFY(sentFront.SeqNo == sequenceNumber); + + (*Counters->BytesInflightTotal) = MemoryUsage; + SentOriginalMessages.pop(); + return result; +} + +// Only called under Lock +TMemoryUsageChange TWriteSession::OnMemoryUsageChangedImpl(i64 diff) { + bool wasOk = MemoryUsage <= Settings.MaxMemoryUsage_; + //if (diff < 0) { + // Y_VERIFY(MemoryUsage >= static_cast<size_t>(std::abs(diff))); + //} + MemoryUsage += diff; + bool nowOk = MemoryUsage <= Settings.MaxMemoryUsage_; + if (wasOk != nowOk) { + if (wasOk) { + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Estimated memory usage " << MemoryUsage + << "[B] reached maximum (" << Settings.MaxMemoryUsage_ << "[B])" + ); + } + else { + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]" + ); + } + } + return {wasOk, nowOk}; +} + +TBuffer CompressBuffer(TVector<TStringBuf>& data, ECodec codec, i32 level) { + TBuffer result; + THolder<IOutputStream> coder = NCompressionDetails::CreateCoder(codec, result, level); + for (auto& buffer : data) { + coder->Write(buffer.data(), buffer.size()); + } + coder->Finish(); + return result; +} + +// May call OnCompressed with sync executor. No external lock. +void TWriteSession::CompressImpl(TBlock&& block_) { + auto weakThis = weak_from_this(); + bool isSyncCompression = !CompressionExecutor->IsAsync(); + Y_VERIFY(block_.Valid); + + std::shared_ptr<TBlock> blockPtr(std::make_shared<TBlock>()); + blockPtr->Move(block_); + auto lambda = [weakThis, codec = Settings.Codec_, level = Settings.CompressionLevel_, + isSyncCompression, blockPtr]() mutable + { + if (auto sharedThis = weakThis.lock()) { + Y_VERIFY(!blockPtr->Compressed); + + auto compressedData = CompressBuffer( + blockPtr->OriginalDataRefs, codec, level + ); + Y_VERIFY(!compressedData.Empty()); + blockPtr->Data = std::move(compressedData); + blockPtr->Compressed = true; + blockPtr->CodecID = static_cast<ui32>(sharedThis->Settings.Codec_); + sharedThis->OnCompressed(std::move(*blockPtr), isSyncCompression); + } + }; + + CompressionExecutor->Post(lambda); +} + +void TWriteSession::OnCompressed(TBlock&& block, bool isSyncCompression) { + TMemoryUsageChange memoryUsage; + if (!isSyncCompression) { + with_lock(Lock) { + memoryUsage = OnCompressedImpl(std::move(block)); + } + } else { + memoryUsage = OnCompressedImpl(std::move(block)); + } + if (memoryUsage.NowOk && !memoryUsage.WasOk) { + EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + } +} + +//Called under lock or synchronously if compression is sync +TMemoryUsageChange TWriteSession::OnCompressedImpl(TBlock&& block) { + UpdateTimedCountersImpl(); + Y_VERIFY(block.Valid); + auto memoryUsage = OnMemoryUsageChangedImpl(static_cast<i64>(block.Data.size()) - block.OriginalMemoryUsage); + (*Counters->BytesInflightUncompressed) -= block.OriginalSize; + (*Counters->BytesInflightCompressed) += block.Data.size(); + + PackedMessagesToSend.emplace(std::move(block)); + SendImpl(); + return memoryUsage; +} + +// Only called under lock +void TWriteSession::ResetForRetryImpl() { + SessionEstablished = false; + const size_t totalPackedMessages = PackedMessagesToSend.size() + SentPackedMessage.size(); + const size_t totalOriginalMessages = OriginalMessagesToSend.size() + SentOriginalMessages.size(); + while (!SentPackedMessage.empty()) { + PackedMessagesToSend.emplace(std::move(SentPackedMessage.front())); + SentPackedMessage.pop(); + } + ui64 minSeqNo = PackedMessagesToSend.empty() ? LastSeqNo + 1 : PackedMessagesToSend.top().Offset; + std::queue<TOriginalMessage> freshOriginalMessagesToSend; + OriginalMessagesToSend.swap(freshOriginalMessagesToSend); + while (!SentOriginalMessages.empty()) { + OriginalMessagesToSend.emplace(std::move(SentOriginalMessages.front())); + SentOriginalMessages.pop(); + } + while (!freshOriginalMessagesToSend.empty()) { + OriginalMessagesToSend.emplace(std::move(freshOriginalMessagesToSend.front())); + freshOriginalMessagesToSend.pop(); + } + if (!OriginalMessagesToSend.empty() && OriginalMessagesToSend.front().SeqNo < minSeqNo) + minSeqNo = OriginalMessagesToSend.front().SeqNo; + MinUnsentSeqNo = minSeqNo; + Y_VERIFY(PackedMessagesToSend.size() == totalPackedMessages); + Y_VERIFY(OriginalMessagesToSend.size() == totalOriginalMessages); +} + +// Called from client Write() methods. With lock +void TWriteSession::FlushWriteIfRequiredImpl() { + + if (!CurrentBatch.Empty() && !CurrentBatch.FlushRequested) { + MessagesAcquired += static_cast<ui64>(CurrentBatch.Acquire()); + if (TInstant::Now() - CurrentBatch.StartedAt >= Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) + || CurrentBatch.CurrentSize >= Settings.BatchFlushSizeBytes_.GetOrElse(0) + || CurrentBatch.CurrentSize >= MaxBlockSize + || CurrentBatch.Messages.size() >= MaxBlockMessageCount + || CurrentBatch.HasCodec() + ) { + WriteBatchImpl(); + return; + } + } +} + + +// Involves compression, but still called under lock. +size_t TWriteSession::WriteBatchImpl() { + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "write " << CurrentBatch.Messages.size() << " messages with seqNo from " + << CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo + ); + + Y_VERIFY(CurrentBatch.Messages.size() <= MaxBlockMessageCount); + + const bool skipCompression = Settings.Codec_ == ECodec::RAW || CurrentBatch.HasCodec(); + if (!skipCompression && Settings.CompressionExecutor_->IsAsync()) { + MessagesAcquired += static_cast<ui64>(CurrentBatch.Acquire()); + } + + size_t size = 0; + for (size_t i = 0; i != CurrentBatch.Messages.size();) { + TBlock block{}; + for (; block.OriginalSize < MaxBlockSize && i != CurrentBatch.Messages.size(); ++i) { + auto sequenceNumber = CurrentBatch.Messages[i].SeqNo; + auto createTs = CurrentBatch.Messages[i].CreatedAt; + + if (!block.MessageCount) { + block.Offset = sequenceNumber; + } + + block.MessageCount += 1; + const auto& datum = CurrentBatch.Messages[i].DataRef; + block.OriginalSize += datum.size(); + block.OriginalMemoryUsage = CurrentBatch.Data.size(); + block.OriginalDataRefs.emplace_back(datum); + if (CurrentBatch.Messages[i].Codec.Defined()) { + Y_VERIFY(CurrentBatch.Messages.size() == 1); + block.CodecID = static_cast<ui32>(*CurrentBatch.Messages[i].Codec); + block.OriginalSize = CurrentBatch.Messages[i].OriginalSize; + block.Compressed = false; + } + size += datum.size(); + UpdateTimedCountersImpl(); + (*Counters->BytesInflightUncompressed) += datum.size(); + (*Counters->MessagesInflight)++; + OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size()); + } + block.Data = std::move(CurrentBatch.Data); + if (skipCompression) { + PackedMessagesToSend.emplace(std::move(block)); + } else { + CompressImpl(std::move(block)); + } + } + CurrentBatch.Reset(); + if (skipCompression) { + SendImpl(); + } + return size; +} + +size_t GetMaxGrpcMessageSize() { + return 120_MB; +} + +bool TWriteSession::IsReadyToSendNextImpl() const { + if (!SessionEstablished) { + return false; + } + if (Aborting) + return false; + if (PackedMessagesToSend.empty()) { + return false; + } + Y_VERIFY(!OriginalMessagesToSend.empty(), "There are packed messages but no original messages"); + Y_VERIFY(OriginalMessagesToSend.front().SeqNo <= PackedMessagesToSend.top().Offset, "Lost original message(s)"); + + return PackedMessagesToSend.top().Offset == OriginalMessagesToSend.front().SeqNo; +} + + +void TWriteSession::UpdateTokenIfNeededImpl() { + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: try to update token"); + + if (!DbDriverState->CredentialsProvider || UpdateTokenInProgress || !SessionEstablished) + return; + TClientMessage clientMessage; + auto* updateRequest = clientMessage.mutable_update_token_request(); + auto token = DbDriverState->CredentialsProvider->GetAuthInfo(); + if (token == PrevToken) + return; + UpdateTokenInProgress = true; + updateRequest->set_token(token); + PrevToken = token; + + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: updating token"); + + Processor->Write(std::move(clientMessage)); +} + +void TWriteSession::SendImpl() { + // External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB + while(IsReadyToSendNextImpl()) { + TClientMessage clientMessage; + auto* writeRequest = clientMessage.mutable_write_request(); + + // Sent blocks while we can without messages reordering + while (IsReadyToSendNextImpl() && clientMessage.ByteSizeLong() < GetMaxGrpcMessageSize()) { + const auto& block = PackedMessagesToSend.top(); + Y_VERIFY(block.Valid); + writeRequest->set_codec(static_cast<i32>(block.CodecID)); + Y_VERIFY(block.MessageCount == 1); + for (size_t i = 0; i != block.MessageCount; ++i) { + Y_VERIFY(!OriginalMessagesToSend.empty()); + + auto& message = OriginalMessagesToSend.front(); + + auto* msgData = writeRequest->add_messages(); + + + msgData->set_seq_no(message.SeqNo + SeqNoShift); + *msgData->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(message.CreatedAt.MilliSeconds()); + + SentOriginalMessages.emplace(std::move(message)); + OriginalMessagesToSend.pop(); + + msgData->set_uncompressed_size(block.OriginalSize); + if (block.Compressed) + msgData->set_data(block.Data.data(), block.Data.size()); + else { + for (auto& buffer: block.OriginalDataRefs) { + msgData->set_data(buffer.data(), buffer.size()); + } + } + } + + + TBlock moveBlock; + moveBlock.Move(block); + SentPackedMessage.emplace(std::move(moveBlock)); + PackedMessagesToSend.pop(); + } + UpdateTokenIfNeededImpl(); + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Send " << writeRequest->messages_size() << " message(s) (" + << OriginalMessagesToSend.size() << " left), first sequence number is " + << writeRequest->messages(0).seq_no() + ); + Processor->Write(std::move(clientMessage)); + } +} + +// Client method, no Lock +bool TWriteSession::Close(TDuration closeTimeout) { + if (AtomicGet(Aborting)) + return false; + DbDriverState->Log.Write( + TLOG_INFO, + LogPrefix() << "Write session: close. Timeout = " << closeTimeout.MilliSeconds() << " ms" + ); + auto startTime = TInstant::Now(); + auto remaining = closeTimeout; + bool ready = false; + bool needSetSeqNoValue = false; + while (remaining > TDuration::Zero()) { + with_lock(Lock) { + if (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) { + ready = true; + } + if (AtomicGet(Aborting)) + break; + } + if (ready) { + break; + } + remaining = closeTimeout - (TInstant::Now() - startTime); + Sleep(Min(TDuration::MilliSeconds(100), remaining)); + } + with_lock(Lock) { + ready = (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) && !AtomicGet(Aborting); + } + with_lock(Lock) { + CloseImpl(EStatus::SUCCESS, NYql::TIssues{}); + needSetSeqNoValue = !InitSeqNoSetDone && (InitSeqNoSetDone = true); + } + if (needSetSeqNoValue) { + InitSeqNoPromise.SetException("session closed"); + } + if (ready) { + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete"); + } else { + DbDriverState->Log.Write( + TLOG_WARNING, + LogPrefix() << "Write session: could not confirm all writes in time" + << " or session aborted, perform hard shutdown" + ); + } + return ready; +} + +void TWriteSession::HandleWakeUpImpl() { + FlushWriteIfRequiredImpl(); + if (AtomicGet(Aborting)) { + return; + } + auto callback = [weakThis = this->weak_from_this()] (bool ok) + { + if (!ok) + return; + if (auto sharedThis = weakThis.lock()) { + with_lock(sharedThis->Lock) { + sharedThis->HandleWakeUpImpl(); + } + } + }; + auto enqueueTokenCallback = [weakThis = this->weak_from_this()] (bool ok) { + if (!ok) + return; + if (auto sharedThis = weakThis.lock()) { + sharedThis->EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + } + }; + if (TInstant::Now() - LastTokenUpdate > UPDATE_TOKEN_PERIOD) { + LastTokenUpdate = TInstant::Now(); + UpdateTokenIfNeededImpl(); + } + + const auto flushAfter = CurrentBatch.StartedAt == TInstant::Zero() + ? WakeupInterval + : WakeupInterval - Min(Now() - CurrentBatch.StartedAt, WakeupInterval); + Connections->ScheduleCallback(flushAfter, std::move(callback)); +} + +void TWriteSession::UpdateTimedCountersImpl() { + auto now = TInstant::Now(); + auto delta = (now - LastCountersUpdateTs).MilliSeconds(); + double percent = 100.0 / Settings.MaxMemoryUsage_; + + Counters->TotalBytesInflightUsageByTime->Collect(*Counters->BytesInflightTotal * percent, delta); + Counters->UncompressedBytesInflightUsageByTime->Collect(*Counters->BytesInflightUncompressed * percent, delta); + Counters->CompressedBytesInflightUsageByTime->Collect(*Counters->BytesInflightCompressed * percent, delta); + + *Counters->CurrentSessionLifetimeMs = (TInstant::Now() - SessionStartedTs).MilliSeconds(); + LastCountersUpdateTs = now; + if (LastCountersLogTs == TInstant::Zero() || TInstant::Now() - LastCountersLogTs > TDuration::Seconds(60)) { + LastCountersLogTs = TInstant::Now(); + +#define LOG_COUNTER(counter) \ + << " " Y_STRINGIZE(counter) ": " \ + << Counters->counter->Val() \ + /**/ + + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() + << "Counters: {" + LOG_COUNTER(Errors) + LOG_COUNTER(CurrentSessionLifetimeMs) + LOG_COUNTER(BytesWritten) + LOG_COUNTER(MessagesWritten) + LOG_COUNTER(BytesWrittenCompressed) + LOG_COUNTER(BytesInflightUncompressed) + LOG_COUNTER(BytesInflightCompressed) + LOG_COUNTER(BytesInflightTotal) + LOG_COUNTER(MessagesInflight) + << " }" + ); + +#undef LOG_COUNTER + } +} + +void TWriteSession::AbortImpl() { + if (!AtomicGet(Aborting)) { + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: aborting"); + AtomicSet(Aborting, 1); + NPersQueue::Cancel(ConnectContext); + NPersQueue::Cancel(ConnectTimeoutContext); + NPersQueue::Cancel(ConnectDelayContext); + if (Processor) + Processor->Cancel(); + + NPersQueue::Cancel(ClientContext); + ClientContext.reset(); // removes context from contexts set from underlying gRPC-client. + } +} + +void TWriteSession::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) { + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close"); + EventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues))); + AbortImpl(); +} + +void TWriteSession::CloseImpl(EStatus statusCode, const TString& message) { + NYql::TIssues issues; + issues.AddIssue(message); + CloseImpl(statusCode, std::move(issues)); +} + +void TWriteSession::CloseImpl(TPlainStatus&& status) { + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close"); + EventsQueue->Close(TSessionClosedEvent(std::move(status))); + AbortImpl(); +} + +TWriteSession::~TWriteSession() { + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: destroy"); + bool needClose = false; + with_lock(Lock) { + if (!AtomicGet(Aborting)) { + CloseImpl(EStatus::SUCCESS, NYql::TIssues{}); + + needClose = !InitSeqNoSetDone && (InitSeqNoSetDone = true); + } + } + if (needClose) + InitSeqNoPromise.SetException("session closed"); +} + +TSimpleBlockingWriteSession::TSimpleBlockingWriteSession( + const TWriteSessionSettings& settings, + std::shared_ptr<TTopicClient::TImpl> client, + std::shared_ptr<TGRpcConnectionsImpl> connections, + TDbDriverStatePtr dbDriverState +) { + auto alteredSettings = settings; + alteredSettings.EventHandlers_.AcksHandler_ = [this](TWriteSessionEvent::TAcksEvent& event) {this->HandleAck(event); }; + alteredSettings.EventHandlers_.ReadyToAcceptHander_ = [this](TWriteSessionEvent::TReadyToAcceptEvent& event) + {this->HandleReady(event); }; + alteredSettings.EventHandlers_.SessionClosedHandler_ = [this](const TSessionClosedEvent& event) {this->HandleClosed(event); }; + + Writer = std::make_shared<TWriteSession>( + alteredSettings, client, connections, dbDriverState + ); + Writer->Start(TDuration::Max()); +} + +ui64 TSimpleBlockingWriteSession::GetInitSeqNo() { + return Writer->GetInitSeqNo().GetValueSync(); +} + +bool TSimpleBlockingWriteSession::Write( + TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp, const TDuration& blockTimeout +) { + if (!IsAlive()) + return false; + + auto continuationToken = WaitForToken(blockTimeout); + if (continuationToken.Defined()) { + Writer->Write(std::move(*continuationToken), std::move(data), seqNo, createTimestamp); + return true; + } + return false; +} + +TMaybe<TContinuationToken> TSimpleBlockingWriteSession::WaitForToken(const TDuration& timeout) { + auto startTime = TInstant::Now(); + TDuration remainingTime = timeout; + TMaybe<TContinuationToken> token = Nothing(); + while(!token.Defined() && remainingTime > TDuration::Zero()) { + with_lock(Lock) { + if (!ContinueTokens.empty()) { + token = std::move(ContinueTokens.front()); + ContinueTokens.pop(); + } + } + if (!IsAlive()) + return Nothing(); + + if (token.Defined()) { + return std::move(*token); + } + else { + remainingTime = timeout - (TInstant::Now() - startTime); + Sleep(Min(remainingTime, TDuration::MilliSeconds(100))); + } + } + return Nothing(); +} + +TWriterCounters::TPtr TSimpleBlockingWriteSession::GetCounters() { + return Writer->GetCounters(); +} + + +bool TSimpleBlockingWriteSession::IsAlive() const { + bool closed = false; + with_lock(Lock) { + closed = Closed; + } + return !closed; +} + +void TSimpleBlockingWriteSession::HandleAck(TWriteSessionEvent::TAcksEvent& event) { + Y_UNUSED(event); +} + +void TSimpleBlockingWriteSession::HandleReady(TWriteSessionEvent::TReadyToAcceptEvent& event) { + with_lock(Lock) { + ContinueTokens.emplace(std::move(event.ContinuationToken)); + } +} +void TSimpleBlockingWriteSession::HandleClosed(const TSessionClosedEvent&) { + with_lock(Lock) { + Closed = true; + } +} +bool TSimpleBlockingWriteSession::Close(TDuration closeTimeout) { + return Writer->Close(std::move(closeTimeout)); +} + +}; // namespace NYdb::NTopic diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h new file mode 100644 index 00000000000..bfe94e21458 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h @@ -0,0 +1,464 @@ +#pragma once + +#include "topic_impl.h" + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +#include <util/generic/buffer.h> + + +namespace NYdb::NTopic { + +inline const TString& GetCodecId(const ECodec codec) { + static THashMap<ECodec, TString> idByCodec{ + {ECodec::RAW, TString(1, '\0')}, + {ECodec::GZIP, "\1"}, + {ECodec::LZOP, "\2"}, + {ECodec::ZSTD, "\3"} + }; + Y_VERIFY(idByCodec.contains(codec)); + return idByCodec[codec]; +} + +class TWriteSessionEventsQueue: public NPersQueue::TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor> { + using TParent = TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor>; + +public: + TWriteSessionEventsQueue(const TWriteSessionSettings& settings) + : TParent(settings) + {} + + void PushEvent(TEventInfo eventInfo) { + if (Closed || ApplyHandler(eventInfo)) { + return; + } + + NPersQueue::TWaiter waiter; + with_lock (Mutex) { + Events.emplace(std::move(eventInfo)); + waiter = PopWaiterImpl(); + } + waiter.Signal(); // Does nothing if waiter is empty. + } + + TMaybe<TEvent> GetEvent(bool block = false) { + TMaybe<TEventInfo> eventInfo; + with_lock (Mutex) { + if (block) { + WaitEventsImpl(); + } + if (HasEventsImpl()) { + eventInfo = GetEventImpl(); + } else { + return Nothing(); + } + } + eventInfo->OnUserRetrievedEvent(); + return std::move(eventInfo->Event); + } + + TVector<TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing()) { + TVector<TEventInfo> eventInfos; + with_lock (Mutex) { + if (block) { + WaitEventsImpl(); + } + eventInfos.reserve(Min(Events.size() + CloseEvent.Defined(), maxEventsCount ? *maxEventsCount : std::numeric_limits<size_t>::max())); + while (!Events.empty()) { + eventInfos.emplace_back(GetEventImpl()); + if (maxEventsCount && eventInfos.size() >= *maxEventsCount) { + break; + } + } + if (CloseEvent && Events.empty() && (!maxEventsCount || eventInfos.size() < *maxEventsCount)) { + eventInfos.push_back({*CloseEvent}); + } + } + + TVector<TEvent> result; + result.reserve(eventInfos.size()); + for (TEventInfo& eventInfo : eventInfos) { + eventInfo.OnUserRetrievedEvent(); + result.emplace_back(std::move(eventInfo.Event)); + } + return result; + } + + void Close(const TSessionClosedEvent& event) { + NPersQueue::TWaiter waiter; + with_lock (Mutex) { + CloseEvent = event; + Closed = true; + waiter = NPersQueue::TWaiter(Waiter.ExtractPromise(), this); + } + + TEventInfo info(event); + ApplyHandler(info); + + waiter.Signal(); + } + +private: + struct THandlersVisitor : public TParent::TBaseHandlersVisitor { + using TParent::TBaseHandlersVisitor::TBaseHandlersVisitor; +#define DECLARE_HANDLER(type, handler, answer) \ + bool operator()(type& event) { \ + if (Settings.EventHandlers_.handler) { \ + Settings.EventHandlers_.handler(event); \ + return answer; \ + } \ + return false; \ + } \ + /**/ + DECLARE_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler_, true); + DECLARE_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHander_, true); + DECLARE_HANDLER(TSessionClosedEvent, SessionClosedHandler_, false); // Not applied + +#undef DECLARE_HANDLER + bool Visit() { + return std::visit(*this, EventInfo.Event); + } + + }; + + bool ApplyHandler(TEventInfo& eventInfo) { + THandlersVisitor visitor(Settings, eventInfo); + return visitor.Visit(); + } + + TEventInfo GetEventImpl() { // Assumes that we're under lock and that the event queue has events. + Y_ASSERT(HasEventsImpl()); + if (!Events.empty()) { + TEventInfo event = std::move(Events.front()); + Events.pop(); + RenewWaiterImpl(); + return event; + } + Y_ASSERT(CloseEvent); + return {*CloseEvent}; + } +}; + +struct TMemoryUsageChange { + bool WasOk; //!< MemoryUsage <= Config.MaxMemoryUsage_ before update + bool NowOk; //!< Same, only after update +}; + +class TWriteSession : public IWriteSession, + public std::enable_shared_from_this<TWriteSession> { +private: + friend class TSimpleBlockingWriteSession; + friend class TTopicClient; + + using TClientMessage = Ydb::Topic::StreamWriteMessage::FromClient; + using TServerMessage = Ydb::Topic::StreamWriteMessage::FromServer; + using IWriteSessionConnectionProcessorFactory = + TTopicClient::TImpl::IWriteSessionConnectionProcessorFactory; + using IProcessor = IWriteSessionConnectionProcessorFactory::IProcessor; + + struct TMessage { + ui64 SeqNo; + TInstant CreatedAt; + TStringBuf DataRef; + TMaybe<ECodec> Codec; + ui32 OriginalSize; // only for coded messages + TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {}, ui32 originalSize = 0) + : SeqNo(seqNo) + , CreatedAt(createdAt) + , DataRef(data) + , Codec(codec) + , OriginalSize(originalSize) + {} + }; + + struct TMessageBatch { + TBuffer Data; + TVector<TMessage> Messages; + ui64 CurrentSize = 0; + TInstant StartedAt = TInstant::Zero(); + bool Acquired = false; + bool FlushRequested = false; + void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize) { + if (StartedAt == TInstant::Zero()) + StartedAt = TInstant::Now(); + CurrentSize += codec ? originalSize : data.size(); + Messages.emplace_back(seqNo, createdAt, data, codec, originalSize); + Acquired = false; + } + + bool HasCodec() const { + return Messages.empty() ? false : Messages.front().Codec.Defined(); + } + + bool Acquire() { + if (Acquired || Messages.empty()) + return false; + auto currSize = Data.size(); + Data.Append(Messages.back().DataRef.data(), Messages.back().DataRef.size()); + Messages.back().DataRef = TStringBuf(Data.data() + currSize, Data.size() - currSize); + Acquired = true; + return true; + } + + bool Empty() const noexcept { + return CurrentSize == 0 && Messages.empty(); + } + + void Reset() { + StartedAt = TInstant::Zero(); + Messages.clear(); + Data.Clear(); + Acquired = false; + CurrentSize = 0; + FlushRequested = false; + } + }; + + struct TBlock { + size_t Offset = 0; //!< First message sequence number in the block + size_t MessageCount = 0; + size_t PartNumber = 0; + size_t OriginalSize = 0; + size_t OriginalMemoryUsage = 0; + ui32 CodecID = static_cast<ui32>(ECodec::RAW); + mutable TVector<TStringBuf> OriginalDataRefs; + mutable TBuffer Data; + bool Compressed = false; + mutable bool Valid = true; + + TBlock& operator=(TBlock&&) = default; + TBlock(TBlock&&) = default; + TBlock() = default; + + //For taking ownership by copying from const object, f.e. lambda -> std::function, priority_queue + void Move(const TBlock& rhs) { + Offset = rhs.Offset; + MessageCount = rhs.MessageCount; + PartNumber = rhs.PartNumber; + OriginalSize = rhs.OriginalSize; + OriginalMemoryUsage = rhs.OriginalMemoryUsage; + CodecID = rhs.CodecID; + OriginalDataRefs.swap(rhs.OriginalDataRefs); + Data.Swap(rhs.Data); + Compressed = rhs.Compressed; + + rhs.Data.Clear(); + rhs.OriginalDataRefs.clear(); + } + }; + + struct TOriginalMessage { + ui64 SeqNo; + TInstant CreatedAt; + size_t Size; + TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size) + : SeqNo(sequenceNumber) + , CreatedAt(createdAt) + , Size(size) + {} + }; + + //! Block comparer, makes block with smallest offset (first sequence number) appear on top of the PackedMessagesToSend priority queue + struct Greater { + bool operator() (const TBlock& lhs, const TBlock& rhs) { + return lhs.Offset > rhs.Offset; + } + }; + + struct THandleResult { + bool DoRestart = false; + TDuration StartDelay = TDuration::Zero(); + bool DoStop = false; + bool DoSetSeqNo = false; + }; + struct TProcessSrvMessageResult { + THandleResult HandleResult; + TMaybe<ui64> InitSeqNo; + TVector<TWriteSessionEvent::TEvent> Events; + bool Ok = true; + }; + + THandleResult OnErrorImpl(NYdb::TPlainStatus&& status); // true - should Start(), false - should Close(), empty - no action + +public: + TWriteSession(const TWriteSessionSettings& settings, + std::shared_ptr<TTopicClient::TImpl> client, + std::shared_ptr<TGRpcConnectionsImpl> connections, + TDbDriverStatePtr dbDriverState); + + TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false) override; + TVector<TWriteSessionEvent::TEvent> GetEvents(bool block = false, + TMaybe<size_t> maxEventsCount = Nothing()) override; + NThreading::TFuture<ui64> GetInitSeqNo() override; + + void Write(TContinuationToken&& continuationToken, TStringBuf data, + TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override; + + void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize, + TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override; + + + NThreading::TFuture<void> WaitEvent() override; + + // Empty maybe - block till all work is done. Otherwise block at most at closeTimeout duration. + bool Close(TDuration closeTimeout = TDuration::Max()) override; + + TWriterCounters::TPtr GetCounters() override {Y_FAIL("Unimplemented"); } //ToDo - unimplemented; + + ~TWriteSession(); // will not call close - destroy everything without acks + +private: + + TStringBuilder LogPrefix() const; + + void UpdateTokenIfNeededImpl(); + + void WriteInternal(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, + TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()); + + void FlushWriteIfRequiredImpl(); + size_t WriteBatchImpl(); + void Start(const TDuration& delay); + void InitWriter(); + + void OnConnect(TPlainStatus&& st, typename IProcessor::TPtr&& processor, + const NGrpc::IQueueClientContextPtr& connectContext); + void OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext); + void ResetForRetryImpl(); + THandleResult RestartImpl(const TPlainStatus& status); + void DoConnect(const TDuration& delay, const TString& endpoint); + void InitImpl(); + void ReadFromProcessor(); // Assumes that we're under lock. + void WriteToProcessorImpl(TClientMessage&& req); // Assumes that we're under lock. + void OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration); + void OnWriteDone(NGrpc::TGrpcStatus&& status, size_t connectionGeneration); + TProcessSrvMessageResult ProcessServerMessageImpl(); + TMemoryUsageChange OnMemoryUsageChangedImpl(i64 diff); + void CompressImpl(TBlock&& block); + void OnCompressed(TBlock&& block, bool isSyncCompression=false); + TMemoryUsageChange OnCompressedImpl(TBlock&& block); + + //TString GetDebugIdentity() const; + TClientMessage GetInitClientMessage(); + bool CleanupOnAcknowledged(ui64 sequenceNumber); + bool IsReadyToSendNextImpl() const; + ui64 GetNextSeqNoImpl(const TMaybe<ui64>& seqNo); + void SendImpl(); + void AbortImpl(); + void CloseImpl(EStatus statusCode, NYql::TIssues&& issues); + void CloseImpl(EStatus statusCode, const TString& message); + void CloseImpl(TPlainStatus&& status); + + void OnErrorResolved() { + RetryState = nullptr; + } + void CheckHandleResultImpl(THandleResult& result); + void ProcessHandleResult(THandleResult& result); + void HandleWakeUpImpl(); + void UpdateTimedCountersImpl(); + +private: + TWriteSessionSettings Settings; + std::shared_ptr<TTopicClient::TImpl> Client; + std::shared_ptr<TGRpcConnectionsImpl> Connections; + TString TargetCluster; + TString InitialCluster; + TString CurrentCluster; + bool OnSeqNoShift = false; + TString PreferredClusterByCDS; + std::shared_ptr<IWriteSessionConnectionProcessorFactory> ConnectionFactory; + TDbDriverStatePtr DbDriverState; + TStringType PrevToken; + bool UpdateTokenInProgress = false; + TInstant LastTokenUpdate = TInstant::Zero(); + std::shared_ptr<TWriteSessionEventsQueue> EventsQueue; + NGrpc::IQueueClientContextPtr ClientContext; // Common client context. + NGrpc::IQueueClientContextPtr ConnectContext; + NGrpc::IQueueClientContextPtr ConnectTimeoutContext; + NGrpc::IQueueClientContextPtr ConnectDelayContext; + size_t ConnectionGeneration = 0; + size_t ConnectionAttemptsDone = 0; + TAdaptiveLock Lock; + IProcessor::TPtr Processor; + IRetryPolicy::IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting). + std::shared_ptr<TServerMessage> ServerMessage; // Server message to write server response to. + + TString SessionId; + IExecutor::TPtr Executor; + IExecutor::TPtr CompressionExecutor; + size_t MemoryUsage = 0; //!< Estimated amount of memory used + + TMessageBatch CurrentBatch; + + std::queue<TOriginalMessage> OriginalMessagesToSend; + std::priority_queue<TBlock, std::vector<TBlock>, Greater> PackedMessagesToSend; + //! Messages that are sent but yet not acknowledged + std::queue<TOriginalMessage> SentOriginalMessages; + std::queue<TBlock> SentPackedMessage; + + const size_t MaxBlockSize = std::numeric_limits<size_t>::max(); + const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility + bool Connected = false; + bool Started = false; + TAtomic Aborting = 0; + bool SessionEstablished = false; + ui32 PartitionId = 0; + ui64 LastSeqNo = 0; + ui64 MinUnsentSeqNo = 0; + ui64 SeqNoShift = 0; + TMaybe<bool> AutoSeqNoMode; + bool ValidateSeqNoMode = false; + + NThreading::TPromise<ui64> InitSeqNoPromise; + bool InitSeqNoSetDone = false; + TInstant SessionStartedTs; + TInstant LastCountersUpdateTs = TInstant::Zero(); + TInstant LastCountersLogTs; + TWriterCounters::TPtr Counters; + TDuration WakeupInterval; + +protected: + ui64 MessagesAcquired = 0; +}; + +class TSimpleBlockingWriteSession : public ISimpleBlockingWriteSession { +private: + using TClientMessage = TWriteSession::TClientMessage; + using TServerMessage = TWriteSession::TServerMessage; + using IWriteSessionConnectionProcessorFactory = TWriteSession::IWriteSessionConnectionProcessorFactory; + using IProcessor = TWriteSession::IProcessor; + +public: + TSimpleBlockingWriteSession( + const TWriteSessionSettings& settings, + std::shared_ptr<TTopicClient::TImpl> client, + std::shared_ptr<TGRpcConnectionsImpl> connections, + TDbDriverStatePtr dbDriverState); + + bool Write(TStringBuf data, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing(), + const TDuration& blockTimeout = TDuration::Max()) override; + + ui64 GetInitSeqNo() override; + + bool Close(TDuration closeTimeout = TDuration::Max()) override; + bool IsAlive() const override; + + TWriterCounters::TPtr GetCounters() override; + +protected: + std::shared_ptr<TWriteSession> Writer; + +private: + TMaybe<TContinuationToken> WaitForToken(const TDuration& timeout); + void HandleAck(TWriteSessionEvent::TAcksEvent&); + void HandleReady(TWriteSessionEvent::TReadyToAcceptEvent&); + void HandleClosed(const TSessionClosedEvent&); + + TAdaptiveLock Lock; + std::queue<TContinuationToken> ContinueTokens; + bool Closed = false; +}; + + +}; // namespace NYdb::NTopic diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index db79186e70b..c6784a7f9cb 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -437,6 +437,53 @@ struct TSessionClosedEvent: public TStatus { TString DebugString() const; }; +struct TWriteStat : public TThrRefBase { + TDuration WriteTime; + TDuration MinTimeInPartitionQueue; + TDuration MaxTimeInPartitionQueue; + TDuration PartitionQuotedTime; + TDuration TopicQuotedTime; + using TPtr = TIntrusivePtr<TWriteStat>; +}; + +class TContinuationToken : public TMoveOnly { + friend class TWriteSession; +private: + TContinuationToken() = default; +}; + +struct TWriterCounters : public TThrRefBase { + using TSelf = TWriterCounters; + using TPtr = TIntrusivePtr<TSelf>; + + explicit TWriterCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters); + + ::NMonitoring::TDynamicCounters::TCounterPtr Errors; + ::NMonitoring::TDynamicCounters::TCounterPtr CurrentSessionLifetimeMs; + + ::NMonitoring::TDynamicCounters::TCounterPtr BytesWritten; + ::NMonitoring::TDynamicCounters::TCounterPtr MessagesWritten; + ::NMonitoring::TDynamicCounters::TCounterPtr BytesWrittenCompressed; + + ::NMonitoring::TDynamicCounters::TCounterPtr BytesInflightUncompressed; + ::NMonitoring::TDynamicCounters::TCounterPtr BytesInflightCompressed; + ::NMonitoring::TDynamicCounters::TCounterPtr BytesInflightTotal; + ::NMonitoring::TDynamicCounters::TCounterPtr MessagesInflight; + + //! Histograms reporting % usage of memory limit in time. + //! Provides a histogram looking like: 10% : 100ms, 20%: 300ms, ... 50%: 200ms, ... 100%: 50ms + //! Which means that < 10% memory usage was observed for 100ms during the period and 50% usage was observed for 200ms + //! Used to monitor if the writer successfully deals with data flow provided. Larger values in higher buckets + //! mean that writer is close to overflow (or being overflown) for major periods of time + //! 3 histograms stand for: + //! Total memory usage: + ::NMonitoring::THistogramPtr TotalBytesInflightUsageByTime; + //! Memory usage by messages waiting for comression: + ::NMonitoring::THistogramPtr UncompressedBytesInflightUsageByTime; + //! Memory usage by compressed messages pending for write: + ::NMonitoring::THistogramPtr CompressedBytesInflightUsageByTime; +}; + struct TReaderCounters: public TThrRefBase { using TSelf = TReaderCounters; using TPtr = TIntrusivePtr<TSelf>; @@ -485,7 +532,7 @@ public: //! Unique identifier of partition session. //! It is unique within one read session. - i64 GetPartitionSessionId() const { + ui64 GetPartitionSessionId() const { return PartitionSessionId; } @@ -495,14 +542,14 @@ public: } //! Partition id. - i64 GetPartitionId() const { + ui64 GetPartitionId() const { return PartitionId; } protected: - i64 PartitionSessionId; + ui64 PartitionSessionId; TString TopicPath; - i64 PartitionId; + ui64 PartitionId; }; //! Events for read session. @@ -963,8 +1010,169 @@ IExecutor::TPtr CreateThreadPoolExecutorAdapter( std::shared_ptr<IThreadPool> threadPool); // Thread pool is expected to have been started. IExecutor::TPtr CreateThreadPoolExecutor(size_t threads); +IExecutor::TPtr CreateSyncExecutor(); + +//! Events for write session. +struct TWriteSessionEvent { + + //! Event with acknowledge for written messages. + struct TWriteAck { + //! Write result. + enum EEventState { + EES_WRITTEN, //! Successfully written. + EES_ALREADY_WRITTEN, //! Skipped on SeqNo deduplication. + EES_DISCARDED //! In case of destruction of writer or retry policy discarded future retries in this writer. + }; + //! Details of successfully written message. + struct TWrittenMessageDetails { + ui64 Offset; + ui64 PartitionId; + }; + //! Same SeqNo as provided on write. + ui64 SeqNo; + EEventState State; + //! Filled only for EES_WRITTEN. Empty for ALREADY and DISCARDED. + TMaybe<TWrittenMessageDetails> Details; + //! Write stats from server. See TWriteStat. nullptr for DISCARDED event. + TWriteStat::TPtr Stat; + + }; + + struct TAcksEvent { + //! Acks could be batched from several WriteBatch/Write requests. + //! Acks for messages from one WriteBatch request could be emitted as several TAcksEvents - + //! they are provided to client as soon as possible. + TVector<TWriteAck> Acks; + + TString DebugString() const; + + }; + + //! Indicates that a writer is ready to accept new message(s). + //! Continuation token should be kept and then used in write methods. + struct TReadyToAcceptEvent { + TContinuationToken ContinuationToken; + + TString DebugString() const; + + }; + + using TEvent = std::variant<TAcksEvent, TReadyToAcceptEvent, TSessionClosedEvent>; +}; + +//! Event debug string. +TString DebugString(const TWriteSessionEvent::TEvent& event); + using TSessionClosedHandler = std::function<void(const TSessionClosedEvent&)>; +//! Settings for write session. +struct TWriteSessionSettings : public TRequestSettings<TWriteSessionSettings> { + using TSelf = TWriteSessionSettings; + + TWriteSessionSettings() = default; + TWriteSessionSettings(const TWriteSessionSettings&) = default; + TWriteSessionSettings(TWriteSessionSettings&&) = default; + TWriteSessionSettings(const TString& path, const TString& producerId, const TString& messageGroupId) { + Path(path); + ProducerId(producerId); + MessageGroupId(messageGroupId); + } + + TWriteSessionSettings& operator=(const TWriteSessionSettings&) = default; + TWriteSessionSettings& operator=(TWriteSessionSettings&&) = default; + + //! Path of topic to write. + FLUENT_SETTING(TString, Path); + + //! ProducerId (aka SourceId) to use. + FLUENT_SETTING(TString, ProducerId); + + //! MessageGroupId to use. + FLUENT_SETTING(TString, MessageGroupId); + + //! Write to an exact partition. Generally server assigns partition automatically by message_group_id. + //! Using this option is not recommended unless you know for sure why you need it. + FLUENT_SETTING_OPTIONAL(ui32, PartitionId); + + //! codec and level to use for data compression prior to write. + FLUENT_SETTING_DEFAULT(ECodec, Codec, ECodec::GZIP); + FLUENT_SETTING_DEFAULT(i32, CompressionLevel, 4); + + //! Writer will not accept new messages if memory usage exceeds this limit. + //! Memory usage consists of raw data pending compression and compressed messages being sent. + FLUENT_SETTING_DEFAULT(ui64, MaxMemoryUsage, 20_MB); + + //! Maximum messages accepted by writer but not written (with confirmation from server). + //! Writer will not accept new messages after reaching the limit. + FLUENT_SETTING_DEFAULT(ui32, MaxInflightCount, 100000); + + //! Retry policy enables automatic retries for non-fatal errors. + //! IRetryPolicy::GetDefaultPolicy() if null (not set). + FLUENT_SETTING(IRetryPolicy::TPtr, RetryPolicy); + + //! User metadata that may be attached to write session. + TWriteSessionSettings& AppendSessionMeta(const TString& key, const TString& value) { + Meta_.Fields[key] = value; + return *this; + }; + + TWriteSessionMeta Meta_; + + //! Writer will accumulate messages until reaching up to BatchFlushSize bytes + //! but for no longer than BatchFlushInterval. + //! Upon reaching FlushInterval or FlushSize limit, all messages will be written with one batch. + //! Greatly increases performance for small messages. + //! Setting either value to zero means immediate write with no batching. (Unrecommended, especially for clients + //! sending small messages at high rate). + FLUENT_SETTING_OPTIONAL(TDuration, BatchFlushInterval); + FLUENT_SETTING_OPTIONAL(ui64, BatchFlushSizeBytes); + + FLUENT_SETTING_DEFAULT(TDuration, ConnectTimeout, TDuration::Seconds(30)); + + FLUENT_SETTING_OPTIONAL(TWriterCounters::TPtr, Counters); + + //! Executor for compression tasks. + //! If not set, default executor will be used. + FLUENT_SETTING(IExecutor::TPtr, CompressionExecutor); + + struct TEventHandlers { + using TSelf = TEventHandlers; + using TWriteAckHandler = std::function<void(TWriteSessionEvent::TAcksEvent&)>; + using TReadyToAcceptHandler = std::function<void(TWriteSessionEvent::TReadyToAcceptEvent&)>; + + //! Function to handle Acks events. + //! If this handler is set, write ack events will be handled by handler, + //! otherwise sent to TWriteSession::GetEvent(). + FLUENT_SETTING(TWriteAckHandler, AcksHandler); + + //! Function to handle ReadyToAccept event. + //! If this handler is set, write these events will be handled by handler, + //! otherwise sent to TWriteSession::GetEvent(). + FLUENT_SETTING(TReadyToAcceptHandler, ReadyToAcceptHander); + + //! Function to handle close session events. + //! If this handler is set, close session events will be handled by handler + //! and then sent to TWriteSession::GetEvent(). + FLUENT_SETTING(TSessionClosedHandler, SessionClosedHandler); + + //! Function to handle all event types. + //! If event with current type has no handler for this type of event, + //! this handler (if specified) will be used. + //! If this handler is not specified, event can be received with TReadSession::GetEvent() method. + FLUENT_SETTING(std::function<void(TReadSessionEvent::TEvent&)>, CommonHandler); + + //! Executor for handlers. + //! If not set, default single threaded executor will be used. + FLUENT_SETTING(IExecutor::TPtr, HandlersExecutor); + }; + + //! Event handlers. + FLUENT_SETTING(TEventHandlers, EventHandlers); + + //! Enables validation of SeqNo. If enabled, then writer will check writing with seqNo and without it and throws exception. + FLUENT_SETTING_DEFAULT(bool, ValidateSeqNo, true); +}; + //! Read settings for single topic. struct TTopicReadSettings { using TSelf = TTopicReadSettings; @@ -1118,6 +1326,72 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> { FLUENT_SETTING_OPTIONAL(TLog, Log); }; +//! Simple write session. Does not need event handlers. Does not provide Events, ContinuationTokens, write Acks. +class ISimpleBlockingWriteSession : public TThrRefBase { +public: + //! Write single message. Blocks for up to blockTimeout if inflight is full or memoryUsage is exceeded; + //! return - true if write succeeded, false if message was not enqueued for write within blockTimeout. + //! no Ack is provided. + virtual bool Write(TStringBuf data, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing(), + const TDuration& blockTimeout = TDuration::Max()) = 0; + + //! Blocks till SeqNo is discovered from server. Returns 0 in case of failure on init. + virtual ui64 GetInitSeqNo() = 0; + + //! Complete all active writes, wait for ack from server and close. + //! closeTimeout - max time to wait. Empty Maybe means infinity. + //! return - true if all writes were completed and acked. false if timeout was reached and some writes were aborted. + + virtual bool Close(TDuration closeTimeout = TDuration::Max()) = 0; + + //! Returns true if write session is alive and acitve. False if session was closed. + virtual bool IsAlive() const = 0; + + virtual TWriterCounters::TPtr GetCounters() = 0; + + //! Close immediately and destroy, don't wait for anything. + virtual ~ISimpleBlockingWriteSession() = default; +}; + +//! Generic write session with all capabilities. +class IWriteSession { +public: + //! Future that is set when next event is available. + virtual NThreading::TFuture<void> WaitEvent() = 0; + + //! Wait and return next event. Use WaitEvent() for non-blocking wait. + virtual TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false) = 0; + + //! Get several events in one call. + //! If blocking = false, instantly returns up to maxEventsCount available events. + //! If blocking = true, blocks till maxEventsCount events are available. + //! If maxEventsCount is unset, write session decides the count to return itself. + virtual TVector<TWriteSessionEvent::TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing()) = 0; + + //! Future that is set when initial SeqNo is available. + virtual NThreading::TFuture<ui64> GetInitSeqNo() = 0; + + //! Write single message. + //! continuationToken - a token earlier provided to client with ReadyToAccept event. + virtual void Write(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) = 0; + + //! Write single message that is already coded by codec. Codec from settings does not apply to this message. + //! continuationToken - a token earlier provided to client with ReadyToAccept event. + //! originalSize - size of unpacked message + virtual void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) = 0; + + + //! Wait for all writes to complete (no more that closeTimeout()), than close. Empty maybe - means infinite timeout. + //! return - true if all writes were completed and acked. false if timeout was reached and some writes were aborted. + virtual bool Close(TDuration closeTimeout = TDuration::Max()) = 0; + + //! Writer counters with different stats (see TWriterConuters). + virtual TWriterCounters::TPtr GetCounters() = 0; + + //! Close() with timeout = 0 and destroy everything instantly. + virtual ~IWriteSession() = default; +}; + class IReadSession { public: //! Main reader loop. @@ -1190,6 +1464,10 @@ public: //! Create read session. std::shared_ptr<IReadSession> CreateReadSession(const TReadSessionSettings& settings); + //! Create write session. + std::shared_ptr<ISimpleBlockingWriteSession> CreateSimpleBlockingWriteSession(const TWriteSessionSettings& settings); + std::shared_ptr<IWriteSession> CreateWriteSession(const TWriteSessionSettings& settings); + private: std::shared_ptr<TImpl> Impl_; }; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt index 2f5c02c216a..a950fc621c8 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt @@ -28,7 +28,7 @@ target_link_libraries(ydb-public-sdk-cpp-client-ydb_topic-ut PUBLIC cpp-client-ydb_persqueue_core client-ydb_persqueue_core-impl ydb_persqueue_core-ut-ut_utils - client-ydb_persqueue_public-codecs + client-ydb_topic-codecs client-ydb_topic-impl ) target_link_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt index 7e0aedf76ef..9a818366aae 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt @@ -30,7 +30,7 @@ target_link_libraries(ydb-public-sdk-cpp-client-ydb_topic-ut PUBLIC cpp-client-ydb_persqueue_core client-ydb_persqueue_core-impl ydb_persqueue_core-ut-ut_utils - client-ydb_persqueue_public-codecs + client-ydb_topic-codecs client-ydb_topic-impl ) target_link_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE |