diff options
| author | abcdef <[email protected]> | 2022-10-17 09:51:16 +0300 | 
|---|---|---|
| committer | abcdef <[email protected]> | 2022-10-17 09:51:16 +0300 | 
| commit | bbe39d44d0036e8b67f8204f00658cf38d8bda65 (patch) | |
| tree | e99cd66f4a498bae07f077b4a478f1264ce77883 | |
| parent | 9921da9afebffed759e3ef7c54b7083e6cab9d0f (diff) | |
3 files changed, 37 insertions, 37 deletions
| diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index aeec25e6772..a6a5de99cb9 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -363,9 +363,9 @@ public:      }      void TakeData(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, -                  TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages, -                  TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages, -                  size_t* maxByteSize, +                  TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>& messages, +                  TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>& compressedMessages, +                  size_t& maxByteSize,                    size_t& dataSize) const;      TDataDecompressionInfoPtr<UseMigrationProtocol> GetParent() const { @@ -768,7 +768,7 @@ public:                              std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session);      TReadSessionEventInfo<UseMigrationProtocol> -        GetEventImpl(size_t* maxByteSize, +        GetEventImpl(size_t& maxByteSize,                       TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator); // Assumes that we're under lock.      TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> @@ -882,7 +882,7 @@ private:      typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent          GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, -                         size_t* maxByteSize, +                         size_t& maxByteSize,                           TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator); // Assumes that we're under lock.      bool ApplyHandler(TReadSessionEventInfo<UseMigrationProtocol>& eventInfo, TDeferredActions<UseMigrationProtocol>& deferred) { diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index 9dd7d4cb834..f6ca37554bd 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -1799,7 +1799,7 @@ void TRawPartitionStreamEventQueue<UseMigrationProtocol>::GetDataEventImpl(TIntr          TDataDecompressionInfoPtr<UseMigrationProtocol> parent = event.GetParent();          size_t size = 0; -        event.TakeData(partitionStream, &messages, &compressedMessages, &maxByteSize, size); +        event.TakeData(partitionStream, messages, compressedMessages, maxByteSize, size);          queue.pop_front();          accumulator.Add(parent, size); @@ -1809,7 +1809,7 @@ void TRawPartitionStreamEventQueue<UseMigrationProtocol>::GetDataEventImpl(TIntr  template <bool UseMigrationProtocol>  typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent  TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, -                                                                size_t* maxByteSize, +                                                                size_t& maxByteSize,                                                                  TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator) // Assumes that we're under lock.  {       TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage> messages; @@ -1824,7 +1824,7 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventImpl(TIntrusivePtr<TP      TPartitionStreamImpl<UseMigrationProtocol>::GetDataEventImpl(stream,                                                                   event.EventsCount, -                                                                 *maxByteSize, +                                                                 maxByteSize,                                                                   messages,                                                                   compressedMessages,                                                                   accumulator); @@ -1840,7 +1840,7 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventImpl(TIntrusivePtr<TP  template <bool UseMigrationProtocol>  TReadSessionEventInfo<UseMigrationProtocol> -TReadSessionEventsQueue<UseMigrationProtocol>::GetEventImpl(size_t* maxByteSize, +TReadSessionEventsQueue<UseMigrationProtocol>::GetEventImpl(size_t& maxByteSize,                                                              TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator) // Assumes that we're under lock.  {      Y_ASSERT(TParent::HasEventsImpl()); @@ -1890,7 +1890,7 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEvents(bool block, TMaybe<size              }              while (TParent::HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) { -                TReadSessionEventInfo<UseMigrationProtocol> event = GetEventImpl(&maxByteSize, accumulator); +                TReadSessionEventInfo<UseMigrationProtocol> event = GetEventImpl(maxByteSize, accumulator);                  eventInfos.emplace_back(std::move(event));                  if (eventInfos.back().IsSessionClosedEvent()) {                      break; @@ -1924,7 +1924,7 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEvent(bool block, size_t maxBy              }              if (TParent::HasEventsImpl()) { -                eventInfo = GetEventImpl(&maxByteSize, accumulator); +                eventInfo = GetEventImpl(maxByteSize, accumulator);              }          } while (block && !eventInfo);      } @@ -2202,9 +2202,9 @@ void TDataDecompressionInfo<UseMigrationProtocol>::PlanDecompressionTasks(double  template<bool UseMigrationProtocol>  void TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, -                                                             TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages, -                                                             TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages, -                                                             size_t* maxByteSize, +                                                             TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>& messages, +                                                             TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>& compressedMessages, +                                                             size_t& maxByteSize,                                                               size_t& dataSize) const  {      auto& msg = Parent->GetServerMessage(); @@ -2236,19 +2236,19 @@ void TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPart                                                                                 messageData.uncompressed_size());          if (Parent->GetDoDecompress()) { -            messages->emplace_back(messageData.data(), -                                   Parent->GetDecompressionError(Batch, Message), -                                   messageInfo, -                                   partitionStream, -                                   messageData.partition_key(), -                                   messageData.explicit_hash()); +            messages.emplace_back(messageData.data(), +                                  Parent->GetDecompressionError(Batch, Message), +                                  messageInfo, +                                  partitionStream, +                                  messageData.partition_key(), +                                  messageData.explicit_hash());          } else { -            compressedMessages->emplace_back(static_cast<ECodec>(messageData.codec()), -                                             messageData.data(), -                                             TVector<TReadSessionEvent::TDataReceivedEvent::TMessageInformation>{messageInfo}, -                                             partitionStream, -                                             messageData.partition_key(), -                                             messageData.explicit_hash()); +            compressedMessages.emplace_back(static_cast<ECodec>(messageData.codec()), +                                            messageData.data(), +                                            TVector<TReadSessionEvent::TDataReceivedEvent::TMessageInformation>{messageInfo}, +                                            partitionStream, +                                            messageData.partition_key(), +                                            messageData.explicit_hash());          }      } else {          NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo(messageData.offset(), @@ -2261,19 +2261,19 @@ void TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPart                                                                                         messageData.message_group_id());          if (Parent->GetDoDecompress()) { -            messages->emplace_back(messageData.data(), -                                   Parent->GetDecompressionError(Batch, Message), -                                   messageInfo, -                                   partitionStream); +            messages.emplace_back(messageData.data(), +                                  Parent->GetDecompressionError(Batch, Message), +                                  messageInfo, +                                  partitionStream);          } else { -            compressedMessages->emplace_back(static_cast<NTopic::ECodec>(batch.codec()), -                                             messageData.data(), -                                             messageInfo, -                                             partitionStream); +            compressedMessages.emplace_back(static_cast<NTopic::ECodec>(batch.codec()), +                                            messageData.data(), +                                            messageInfo, +                                            partitionStream);          }      } -    *maxByteSize -= Min(*maxByteSize, messageData.data().size()); +    maxByteSize -= Min(maxByteSize, messageData.data().size());      dataSize += messageData.data().size(); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp index 4cb1076cbc3..65d83f387b6 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp @@ -1853,7 +1853,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {              size_t maxByteSize = std::numeric_limits<size_t>::max();\              TUserRetrievedEventsInfoAccumulator<true> accumulator; \  \ -            auto event = sessionQueue.GetEventImpl(&maxByteSize, accumulator); \ +            auto event = sessionQueue.GetEventImpl(maxByteSize, accumulator); \  \              UNIT_ASSERT(std::holds_alternative<TExpectedEvent>(event.GetEvent()));\          } @@ -1865,7 +1865,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {              size_t maxByteSize = std::numeric_limits<size_t>::max(); \              TUserRetrievedEventsInfoAccumulator<true> accumulator; \  \ -            auto event = sessionQueue.GetEventImpl(&maxByteSize, accumulator); \ +            auto event = sessionQueue.GetEventImpl(maxByteSize, accumulator); \  \              UNIT_ASSERT(std::holds_alternative<TExpectedEvent>(event.GetEvent())); \              UNIT_ASSERT_VALUES_EQUAL(std::get<TExpectedEvent>(event.GetEvent()).GetMessagesCount(), count); \ | 
