summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <[email protected]>2022-10-17 09:51:16 +0300
committerabcdef <[email protected]>2022-10-17 09:51:16 +0300
commitbbe39d44d0036e8b67f8204f00658cf38d8bda65 (patch)
treee99cd66f4a498bae07f077b4a478f1264ce77883
parent9921da9afebffed759e3ef7c54b7083e6cab9d0f (diff)
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h10
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp60
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp4
3 files changed, 37 insertions, 37 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
index aeec25e6772..a6a5de99cb9 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
@@ -363,9 +363,9 @@ public:
}
void TakeData(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
- TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages,
- TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages,
- size_t* maxByteSize,
+ TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>& messages,
+ TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>& compressedMessages,
+ size_t& maxByteSize,
size_t& dataSize) const;
TDataDecompressionInfoPtr<UseMigrationProtocol> GetParent() const {
@@ -768,7 +768,7 @@ public:
std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session);
TReadSessionEventInfo<UseMigrationProtocol>
- GetEventImpl(size_t* maxByteSize,
+ GetEventImpl(size_t& maxByteSize,
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator); // Assumes that we're under lock.
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent>
@@ -882,7 +882,7 @@ private:
typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent
GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream,
- size_t* maxByteSize,
+ size_t& maxByteSize,
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator); // Assumes that we're under lock.
bool ApplyHandler(TReadSessionEventInfo<UseMigrationProtocol>& eventInfo, TDeferredActions<UseMigrationProtocol>& deferred) {
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
index 9dd7d4cb834..f6ca37554bd 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
@@ -1799,7 +1799,7 @@ void TRawPartitionStreamEventQueue<UseMigrationProtocol>::GetDataEventImpl(TIntr
TDataDecompressionInfoPtr<UseMigrationProtocol> parent = event.GetParent();
size_t size = 0;
- event.TakeData(partitionStream, &messages, &compressedMessages, &maxByteSize, size);
+ event.TakeData(partitionStream, messages, compressedMessages, maxByteSize, size);
queue.pop_front();
accumulator.Add(parent, size);
@@ -1809,7 +1809,7 @@ void TRawPartitionStreamEventQueue<UseMigrationProtocol>::GetDataEventImpl(TIntr
template <bool UseMigrationProtocol>
typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent
TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream,
- size_t* maxByteSize,
+ size_t& maxByteSize,
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator) // Assumes that we're under lock.
{
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage> messages;
@@ -1824,7 +1824,7 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventImpl(TIntrusivePtr<TP
TPartitionStreamImpl<UseMigrationProtocol>::GetDataEventImpl(stream,
event.EventsCount,
- *maxByteSize,
+ maxByteSize,
messages,
compressedMessages,
accumulator);
@@ -1840,7 +1840,7 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventImpl(TIntrusivePtr<TP
template <bool UseMigrationProtocol>
TReadSessionEventInfo<UseMigrationProtocol>
-TReadSessionEventsQueue<UseMigrationProtocol>::GetEventImpl(size_t* maxByteSize,
+TReadSessionEventsQueue<UseMigrationProtocol>::GetEventImpl(size_t& maxByteSize,
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator) // Assumes that we're under lock.
{
Y_ASSERT(TParent::HasEventsImpl());
@@ -1890,7 +1890,7 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEvents(bool block, TMaybe<size
}
while (TParent::HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) {
- TReadSessionEventInfo<UseMigrationProtocol> event = GetEventImpl(&maxByteSize, accumulator);
+ TReadSessionEventInfo<UseMigrationProtocol> event = GetEventImpl(maxByteSize, accumulator);
eventInfos.emplace_back(std::move(event));
if (eventInfos.back().IsSessionClosedEvent()) {
break;
@@ -1924,7 +1924,7 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEvent(bool block, size_t maxBy
}
if (TParent::HasEventsImpl()) {
- eventInfo = GetEventImpl(&maxByteSize, accumulator);
+ eventInfo = GetEventImpl(maxByteSize, accumulator);
}
} while (block && !eventInfo);
}
@@ -2202,9 +2202,9 @@ void TDataDecompressionInfo<UseMigrationProtocol>::PlanDecompressionTasks(double
template<bool UseMigrationProtocol>
void TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
- TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages,
- TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages,
- size_t* maxByteSize,
+ TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>& messages,
+ TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>& compressedMessages,
+ size_t& maxByteSize,
size_t& dataSize) const
{
auto& msg = Parent->GetServerMessage();
@@ -2236,19 +2236,19 @@ void TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPart
messageData.uncompressed_size());
if (Parent->GetDoDecompress()) {
- messages->emplace_back(messageData.data(),
- Parent->GetDecompressionError(Batch, Message),
- messageInfo,
- partitionStream,
- messageData.partition_key(),
- messageData.explicit_hash());
+ messages.emplace_back(messageData.data(),
+ Parent->GetDecompressionError(Batch, Message),
+ messageInfo,
+ partitionStream,
+ messageData.partition_key(),
+ messageData.explicit_hash());
} else {
- compressedMessages->emplace_back(static_cast<ECodec>(messageData.codec()),
- messageData.data(),
- TVector<TReadSessionEvent::TDataReceivedEvent::TMessageInformation>{messageInfo},
- partitionStream,
- messageData.partition_key(),
- messageData.explicit_hash());
+ compressedMessages.emplace_back(static_cast<ECodec>(messageData.codec()),
+ messageData.data(),
+ TVector<TReadSessionEvent::TDataReceivedEvent::TMessageInformation>{messageInfo},
+ partitionStream,
+ messageData.partition_key(),
+ messageData.explicit_hash());
}
} else {
NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo(messageData.offset(),
@@ -2261,19 +2261,19 @@ void TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPart
messageData.message_group_id());
if (Parent->GetDoDecompress()) {
- messages->emplace_back(messageData.data(),
- Parent->GetDecompressionError(Batch, Message),
- messageInfo,
- partitionStream);
+ messages.emplace_back(messageData.data(),
+ Parent->GetDecompressionError(Batch, Message),
+ messageInfo,
+ partitionStream);
} else {
- compressedMessages->emplace_back(static_cast<NTopic::ECodec>(batch.codec()),
- messageData.data(),
- messageInfo,
- partitionStream);
+ compressedMessages.emplace_back(static_cast<NTopic::ECodec>(batch.codec()),
+ messageData.data(),
+ messageInfo,
+ partitionStream);
}
}
- *maxByteSize -= Min(*maxByteSize, messageData.data().size());
+ maxByteSize -= Min(maxByteSize, messageData.data().size());
dataSize += messageData.data().size();
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
index 4cb1076cbc3..65d83f387b6 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
@@ -1853,7 +1853,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
size_t maxByteSize = std::numeric_limits<size_t>::max();\
TUserRetrievedEventsInfoAccumulator<true> accumulator; \
\
- auto event = sessionQueue.GetEventImpl(&maxByteSize, accumulator); \
+ auto event = sessionQueue.GetEventImpl(maxByteSize, accumulator); \
\
UNIT_ASSERT(std::holds_alternative<TExpectedEvent>(event.GetEvent()));\
}
@@ -1865,7 +1865,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
size_t maxByteSize = std::numeric_limits<size_t>::max(); \
TUserRetrievedEventsInfoAccumulator<true> accumulator; \
\
- auto event = sessionQueue.GetEventImpl(&maxByteSize, accumulator); \
+ auto event = sessionQueue.GetEventImpl(maxByteSize, accumulator); \
\
UNIT_ASSERT(std::holds_alternative<TExpectedEvent>(event.GetEvent())); \
UNIT_ASSERT_VALUES_EQUAL(std::get<TExpectedEvent>(event.GetEvent()).GetMessagesCount(), count); \