blob: bca03dc72f79514d2c4502836ab77da06504033f (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
#include "read_batch_converter.h"
namespace NPersQueue {
static void Convert(const ReadResponse::BatchedData::PartitionData& partition, ReadResponse::Data::MessageBatch* dstBatch) {
dstBatch->set_topic(partition.topic());
dstBatch->set_partition(partition.partition());
for (const ReadResponse::BatchedData::Batch& batch : partition.batch()) {
for (const ReadResponse::BatchedData::MessageData& message : batch.message_data()) {
ReadResponse::Data::Message* const dstMessage = dstBatch->add_message();
dstMessage->set_data(message.data());
dstMessage->set_offset(message.offset());
MessageMeta* const meta = dstMessage->mutable_meta();
meta->set_source_id(batch.source_id());
meta->set_seq_no(message.seq_no());
meta->set_create_time_ms(message.create_time_ms());
meta->set_write_time_ms(batch.write_time_ms());
meta->set_codec(message.codec());
meta->set_ip(batch.ip());
meta->set_uncompressed_size(message.uncompressed_size());
if (batch.has_extra_fields()) {
*meta->mutable_extra_fields() = batch.extra_fields();
}
}
}
}
void ConvertToOldBatch(ReadResponse& response) {
if (!response.has_batched_data()) {
return;
}
ReadResponse::BatchedData data;
data.Swap(response.mutable_batched_data());
ReadResponse::Data& dstData = *response.mutable_data(); // this call will clear BatchedData field
dstData.set_cookie(data.cookie());
for (const ReadResponse::BatchedData::PartitionData& partition : data.partition_data()) {
Convert(partition, dstData.add_message_batch());
}
}
} // namespace NPersQueue
|