diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-18 12:15:08 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-18 12:15:08 +0300 |
commit | b658dedfbf7b6b0262f0113b4f717ebc90b82a11 (patch) | |
tree | 65c8e74d9d874d5808e79a25f86ff881e82e12cb | |
parent | f7134d2525269ea8412b3bf0736239fbfd545a2e (diff) | |
download | ydb-b658dedfbf7b6b0262f0113b4f717ebc90b82a11.tar.gz |
YQ-888 RangesMode setting
RangesMode setting
ref:2c39da53ed91a68243836f06008a3fd0fe0eceb9
3 files changed, 12 insertions, 5 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp index 3b82d7ec0d..0993363293 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp @@ -864,7 +864,7 @@ void TSingleClusterReadSessionImpl::InitImpl(TDeferredActions& deferred) { // As Log << TLOG_DEBUG << "Successfully connected. Initializing session"; Ydb::PersQueue::V1::MigrationStreamingReadClientMessage req; auto& init = *req.mutable_init_request(); - init.set_ranges_mode(RangesMode); + init.set_ranges_mode(GetRangesMode()); for (const TTopicReadSettings& topic : Settings.Topics_) { auto* topicSettings = init.add_topics_read_settings(); topicSettings->set_topic(topic.Path_); @@ -980,7 +980,7 @@ void TSingleClusterReadSessionImpl::Commit(const TPartitionStreamImpl* partition } Ydb::PersQueue::V1::MigrationStreamingReadClientMessage req; bool hasSomethingToCommit = false; - if (RangesMode) { + if (GetRangesMode()) { hasSomethingToCommit = true; auto* range = req.mutable_commit()->add_offset_ranges(); range->set_assign_id(partitionStream->GetAssignId()); @@ -1176,7 +1176,7 @@ void TSingleClusterReadSessionImpl::OnReadDoneImpl(Ydb::PersQueue::V1::Migration for (const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& messageData : batch.message_data()) { // Check offsets continuity. if (messageData.offset() != desiredOffset) { - bool res = partitionStream->AddToCommitRanges(desiredOffset, messageData.offset(), RangesMode); + bool res = partitionStream->AddToCommitRanges(desiredOffset, messageData.offset(), GetRangesMode()); Y_VERIFY(res); } @@ -1475,6 +1475,10 @@ void TSingleClusterReadSessionImpl::UpdateMemoryUsageStatistics() { } } +bool TSingleClusterReadSessionImpl::GetRangesMode() const { + return Settings.RangesMode_.GetOrElse(RangesMode); +} + bool TSingleClusterReadSessionImpl::TPartitionCookieMapping::AddMapping(const TCookie::TPtr& cookie) { if (!Cookies.emplace(cookie->GetKey(), cookie).second) { return false; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index e08db09456..4176d2376d 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -859,6 +859,8 @@ private: return Max<i64>(1l, static_cast<i64>(Settings.MaxMemoryUsageBytes_) - GetCompressedDataSizeLimit()); } + bool GetRangesMode() const; + void CallCloseCallbackImpl(); void UpdateMemoryUsageStatisticsImpl(); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h index ee6a5fc64a..fda48a2d01 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h @@ -1296,6 +1296,9 @@ struct TReadSessionSettings : public TRequestSettings<TReadSessionSettings> { FLUENT_SETTING_VECTOR(TString, Clusters); FLUENT_SETTING_DEFAULT(TDuration, ConnectTimeout, TDuration::Seconds(30)); + + //! Experimental option + FLUENT_SETTING_OPTIONAL(bool, RangesMode); }; //! Simple write session. Does not need event handlers. Does not provide Events, ContinuationTokens, write Acks. @@ -1397,11 +1400,9 @@ public: //! Stop reading data and process only control events. //! You might need this function if a receiving side //! is not ready to process data. - //! Not implemented yet. virtual void StopReadingData() = 0; //! Resume reading data. - //! Not implemented yet. virtual void ResumeReadingData() = 0; //! Close read session. |