aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-18 12:15:08 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-18 12:15:08 +0300
commitb658dedfbf7b6b0262f0113b4f717ebc90b82a11 (patch)
tree65c8e74d9d874d5808e79a25f86ff881e82e12cb
parentf7134d2525269ea8412b3bf0736239fbfd545a2e (diff)
downloadydb-b658dedfbf7b6b0262f0113b4f717ebc90b82a11.tar.gz
YQ-888 RangesMode setting
RangesMode setting ref:2c39da53ed91a68243836f06008a3fd0fe0eceb9
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp10
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h5
3 files changed, 12 insertions, 5 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
index 3b82d7ec0d..0993363293 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
@@ -864,7 +864,7 @@ void TSingleClusterReadSessionImpl::InitImpl(TDeferredActions& deferred) { // As
Log << TLOG_DEBUG << "Successfully connected. Initializing session";
Ydb::PersQueue::V1::MigrationStreamingReadClientMessage req;
auto& init = *req.mutable_init_request();
- init.set_ranges_mode(RangesMode);
+ init.set_ranges_mode(GetRangesMode());
for (const TTopicReadSettings& topic : Settings.Topics_) {
auto* topicSettings = init.add_topics_read_settings();
topicSettings->set_topic(topic.Path_);
@@ -980,7 +980,7 @@ void TSingleClusterReadSessionImpl::Commit(const TPartitionStreamImpl* partition
}
Ydb::PersQueue::V1::MigrationStreamingReadClientMessage req;
bool hasSomethingToCommit = false;
- if (RangesMode) {
+ if (GetRangesMode()) {
hasSomethingToCommit = true;
auto* range = req.mutable_commit()->add_offset_ranges();
range->set_assign_id(partitionStream->GetAssignId());
@@ -1176,7 +1176,7 @@ void TSingleClusterReadSessionImpl::OnReadDoneImpl(Ydb::PersQueue::V1::Migration
for (const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& messageData : batch.message_data()) {
// Check offsets continuity.
if (messageData.offset() != desiredOffset) {
- bool res = partitionStream->AddToCommitRanges(desiredOffset, messageData.offset(), RangesMode);
+ bool res = partitionStream->AddToCommitRanges(desiredOffset, messageData.offset(), GetRangesMode());
Y_VERIFY(res);
}
@@ -1475,6 +1475,10 @@ void TSingleClusterReadSessionImpl::UpdateMemoryUsageStatistics() {
}
}
+bool TSingleClusterReadSessionImpl::GetRangesMode() const {
+ return Settings.RangesMode_.GetOrElse(RangesMode);
+}
+
bool TSingleClusterReadSessionImpl::TPartitionCookieMapping::AddMapping(const TCookie::TPtr& cookie) {
if (!Cookies.emplace(cookie->GetKey(), cookie).second) {
return false;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
index e08db09456..4176d2376d 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
@@ -859,6 +859,8 @@ private:
return Max<i64>(1l, static_cast<i64>(Settings.MaxMemoryUsageBytes_) - GetCompressedDataSizeLimit());
}
+ bool GetRangesMode() const;
+
void CallCloseCallbackImpl();
void UpdateMemoryUsageStatisticsImpl();
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
index ee6a5fc64a..fda48a2d01 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
@@ -1296,6 +1296,9 @@ struct TReadSessionSettings : public TRequestSettings<TReadSessionSettings> {
FLUENT_SETTING_VECTOR(TString, Clusters);
FLUENT_SETTING_DEFAULT(TDuration, ConnectTimeout, TDuration::Seconds(30));
+
+ //! Experimental option
+ FLUENT_SETTING_OPTIONAL(bool, RangesMode);
};
//! Simple write session. Does not need event handlers. Does not provide Events, ContinuationTokens, write Acks.
@@ -1397,11 +1400,9 @@ public:
//! Stop reading data and process only control events.
//! You might need this function if a receiving side
//! is not ready to process data.
- //! Not implemented yet.
virtual void StopReadingData() = 0;
//! Resume reading data.
- //! Not implemented yet.
virtual void ResumeReadingData() = 0;
//! Close read session.