aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2022-07-13 16:42:15 +0300
committerildar-khisam <ikhis@ydb.tech>2022-07-13 16:42:15 +0300
commitcaafd737598a64041420fe6cd27636b8b54e7afd (patch)
tree8f6dc306dc9f25ccbbb584152a32390dd37a778e
parentd4b3cf864f1c876ff34cd2c30fc03c8572df686c (diff)
downloadydb-caafd737598a64041420fe6cd27636b8b54e7afd.tar.gz
fix topic service
fix topic service [] enable 'topic' by default
-rw-r--r--ydb/core/driver_lib/run/run.cpp2
-rw-r--r--ydb/services/persqueue_v1/actors/events.h8
-rw-r--r--ydb/services/persqueue_v1/actors/partition_actor.cpp21
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.h1
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp83
5 files changed, 83 insertions, 32 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index 0b7431c5271..bd5211cc7d2 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -523,7 +523,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
names["pq"] = &hasPQ;
bool hasPQv1 = services.empty();
names["pqv1"] = &hasPQv1;
- bool hasTopic = false;
+ bool hasTopic = services.empty();
names["topic"] = &hasTopic;
bool hasPQCD = services.empty();
names["pqcd"] = &hasPQCD;
diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h
index 29b45676bf2..1837044a763 100644
--- a/ydb/services/persqueue_v1/actors/events.h
+++ b/ydb/services/persqueue_v1/actors/events.h
@@ -177,6 +177,14 @@ struct TEvPQProxy {
, ReadTimestampMs(readTimestampMs)
{ }
+ explicit TEvRead(ui64 maxSize)
+ : Guid(CreateGuidAsString())
+ , MaxCount(0)
+ , MaxSize(maxSize)
+ , MaxTimeLagMs(0)
+ , ReadTimestampMs(0) {
+ }
+
const TString Guid;
ui64 MaxCount;
ui64 MaxSize;
diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp
index c6304b6984c..f63c67a3ab9 100644
--- a/ydb/services/persqueue_v1/actors/partition_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp
@@ -303,6 +303,13 @@ void SetBatchExtraField(Topic::StreamReadMessage::ReadResponse::Batch* batch, TS
(*batch->mutable_write_session_meta())[key] = std::move(value);
}
+i32 GetDataChunkCodec(const NKikimrPQClient::TDataChunk& proto) {
+ if (proto.HasCodec()) {
+ return proto.GetCodec() + 1;
+ }
+ return 0;
+}
+
template<typename TReadResponse>
bool FillBatchedData(
TReadResponse* data, const NKikimrClient::TCmdReadResult& res,
@@ -326,6 +333,8 @@ bool FillBatchedData(
bool hasOffset = false;
bool hasData = false;
+ i32 batchCodec = 0; // UNSPECIFIED
+
typename TReadResponse::Batch* currentBatch = nullptr;
for (ui32 i = 0; i < res.ResultSize(); ++i) {
const auto& r = res.GetResult(i);
@@ -348,15 +357,19 @@ bool FillBatchedData(
sourceId = NPQ::NSourceIdEncoding::Decode(r.GetSourceId());
}
- if (!currentBatch
- || GetBatchWriteTimestampMS(currentBatch) != static_cast<i64>(r.GetWriteTimestampMS())
- || GetBatchSourceId(currentBatch) != sourceId) {
+ if (!currentBatch || GetBatchWriteTimestampMS(currentBatch) != static_cast<i64>(r.GetWriteTimestampMS()) ||
+ GetBatchSourceId(currentBatch) != sourceId ||
+ (!UseMigrationProtocol && GetDataChunkCodec(proto) != batchCodec)) {
// If write time and source id are the same, the rest fields will be the same too.
currentBatch = partitionData->add_batches();
i64 write_ts = static_cast<i64>(r.GetWriteTimestampMS());
Y_VERIFY(write_ts >= 0);
SetBatchWriteTimestampMS(currentBatch, write_ts);
SetBatchSourceId(currentBatch, std::move(sourceId));
+ batchCodec = GetDataChunkCodec(proto);
+ if constexpr (!UseMigrationProtocol) {
+ currentBatch->set_codec(batchCodec);
+ }
if (proto.HasMeta()) {
const auto& header = proto.GetMeta();
@@ -406,7 +419,7 @@ bool FillBatchedData(
*message->mutable_created_at() =
::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(r.GetCreateTimestampMS());
- message->set_message_group_id(r.GetPartitionKey());
+ message->set_message_group_id(GetBatchSourceId(currentBatch));
}
hasData = true;
}
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.h b/ydb/services/persqueue_v1/actors/read_session_actor.h
index 6013112419b..de20aee7bdf 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.h
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.h
@@ -289,6 +289,7 @@ private:
ui32 MaxReadSize;
i64 MaxTimeLagMs;
i64 ReadTimestampMs;
+ i64 ReadSizeBudget;
TString Auth;
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index d2cd4625976..dd38d513d28 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -30,12 +30,12 @@ using namespace PersQueue::V1;
//TODO: add here tracking of bytes in/out
-template<bool UseMigrationProtocol>
-TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(
- TEvStreamPQReadRequest* request, const ui64 cookie, const TActorId& schemeCache, const TActorId& newSchemeCache,
- TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const TMaybe<TString> clientDC,
- const NPersQueue::TTopicsListController& topicsHandler
-)
+template <bool UseMigrationProtocol>
+TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(TEvStreamPQReadRequest* request, const ui64 cookie,
+ const TActorId& schemeCache, const TActorId& newSchemeCache,
+ TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
+ const TMaybe<TString> clientDC,
+ const NPersQueue::TTopicsListController& topicsHandler)
: Request(request)
, ClientDC(clientDC ? *clientDC : "other")
, StartTimestamp(TInstant::Now())
@@ -52,6 +52,7 @@ TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(
, MaxReadSize(0)
, MaxTimeLagMs(0)
, ReadTimestampMs(0)
+ , ReadSizeBudget(0)
, ForceACLCheck(false)
, RequestNotChecked(true)
, LastACLCheckTimestamp(TInstant::Zero())
@@ -62,12 +63,10 @@ TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(
, BytesInflight_(0)
, RequestedBytes(0)
, ReadsInfly(0)
- , TopicsHandler(topicsHandler)
-{
+ , TopicsHandler(topicsHandler) {
Y_ASSERT(Request);
}
-
template<bool UseMigrationProtocol>
TReadSessionActor<UseMigrationProtocol>::~TReadSessionActor() = default;
@@ -254,7 +253,7 @@ if (!partId.DiscoveryConverter->IsValid()) { \
break;
}
case TClientMessage::kReadRequest: {
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvRead()); // Proto read message have no parameters
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvRead(request.read_request().bytes_size()));
break;
}
case TClientMessage::kPartitionSessionStatusRequest: {
@@ -682,8 +681,6 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr&
MaxReadSize = NormalizeMaxReadSize(0);
MaxTimeLagMs = 0; // max_lag per topic only
ReadTimestampMs = 0; // read_from per topic only
- // MaxTimeLagMs = ::google::protobuf::util::TimeUtil::DurationToMilliseconds(init.max_lag());
- // ReadTimestampMs = ::google::protobuf::util::TimeUtil::TimestampToMilliseconds(init.read_from());
ReadOnlyLocal = true;
}
if (MaxTimeLagMs < 0) {
@@ -1531,7 +1528,11 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvRead::TPtr&
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got read request with guid: " << event->Guid);
- Reads.emplace_back(event.Release());
+ if constexpr (UseMigrationProtocol) {
+ Reads.emplace_back(event.Release());
+ } else {
+ ReadSizeBudget += event->MaxSize;
+ }
ProcessReads(ctx);
}
@@ -1683,20 +1684,21 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext&
RequestedBytes -= formedResponse->RequestedBytes;
ReadsInfly--;
+ if constexpr (!UseMigrationProtocol) {
+ ReadSizeBudget += formedResponse->RequestedBytes;
+ ReadSizeBudget -= diff;
+ }
// Bring back available partitions.
// If some partition was removed from partitions container, it is not bad because it will be checked during read processing.
AvailablePartitions.insert(formedResponse->PartitionsBecameAvailable.begin(), formedResponse->PartitionsBecameAvailable.end());
- if (!hasMessages) {
- // process new read
- TClientMessage req;
- if constexpr(UseMigrationProtocol) {
- req.mutable_read();
- } else {
- req.mutable_read_request();
+ if constexpr (UseMigrationProtocol) {
+ if (!hasMessages) {
+ // process new read
+ // Start new reading request with the same guid
+ Reads.emplace_back(new TEvPQProxy::TEvRead(formedResponse->Guid));
}
- Reads.emplace_back(new TEvPQProxy::TEvRead(formedResponse->Guid)); // Start new reading request with the same guid
}
ProcessReads(ctx); // returns false if actor died
@@ -1727,12 +1729,27 @@ ui32 TReadSessionActor<UseMigrationProtocol>::NormalizeMaxReadSize(ui32 sourceVa
template<bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext& ctx) {
- while (!Reads.empty() && BytesInflight_ + RequestedBytes < MAX_INFLY_BYTES && ReadsInfly < MAX_INFLY_READS) {
+ auto ShouldContinueReads = [this]() {
+ if constexpr (UseMigrationProtocol) {
+ return !Reads.empty() && ReadsInfly < MAX_INFLY_READS;
+ } else {
+ return ReadSizeBudget > 0;
+ }
+ };
+
+ while (ShouldContinueReads() && BytesInflight_ + RequestedBytes < MAX_INFLY_BYTES) {
ui32 count = MaxReadMessagesCount;
ui64 size = MaxReadSize;
ui32 partitionsAsked = 0;
- typename TFormedReadResponse<TServerMessage>::TPtr formedResponse = new TFormedReadResponse<TServerMessage>(Reads.front()->Guid, ctx.Now());
+ TString guid;
+ if constexpr (UseMigrationProtocol) {
+ guid = Reads.front()->Guid;
+ } else {
+ guid = CreateGuidAsString();
+ }
+ typename TFormedReadResponse<TServerMessage>::TPtr formedResponse =
+ new TFormedReadResponse<TServerMessage>(guid, ctx.Now());
while (!AvailablePartitions.empty()) {
auto part = *AvailablePartitions.begin();
AvailablePartitions.erase(AvailablePartitions.begin());
@@ -1746,7 +1763,10 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext&
const ui32 ccount = Min<ui32>(part.MsgLag * LAG_GROW_MULTIPLIER, count);
count -= ccount;
- const ui64 csize = (ui64)Min<double>(part.SizeLag * LAG_GROW_MULTIPLIER, size);
+ ui64 csize = (ui64)Min<double>(part.SizeLag * LAG_GROW_MULTIPLIER, size);
+ if constexpr (!UseMigrationProtocol) {
+ csize = Min<i64>(csize, ReadSizeBudget);
+ }
size -= csize;
Y_VERIFY(csize < Max<i32>());
@@ -1765,9 +1785,9 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext&
auto lags_it = MaxLagByTopic.find(it->second.Topic->GetInternalName());
Y_VERIFY(lags_it != MaxLagByTopic.end());
- ui32 maxLag = Max(MaxTimeLagMs, lags_it->second);
+ ui32 maxLag = lags_it->second;
- TAutoPtr<TEvPQProxy::TEvRead> read = new TEvPQProxy::TEvRead(Reads.front()->Guid, ccount, csize, maxLag, readTimestampMs);
+ TAutoPtr<TEvPQProxy::TEvRead> read = new TEvPQProxy::TEvRead(guid, ccount, csize, maxLag, readTimestampMs);
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX
<< " performing read request with guid " << read->Guid
@@ -1786,14 +1806,21 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext&
RequestedBytes += csize;
formedResponse->RequestedBytes += csize;
+ ReadSizeBudget -= csize;
ctx.Send(it->second.Actor, read.Release());
const auto insertResult = PartitionToReadResponse.insert(std::make_pair(it->second.Actor, formedResponse));
Y_VERIFY(insertResult.second);
+ // Only from single partition
+ if constexpr (!UseMigrationProtocol) {
+ break;
+ }
+
if (count == 0 || size == 0)
break;
}
+
if (partitionsAsked == 0)
break;
ReadsTotal.Inc();
@@ -1807,7 +1834,9 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext&
if (BytesInflight) {
(*BytesInflight) += diff;
}
- Reads.pop_front();
+ if constexpr (UseMigrationProtocol) {
+ Reads.pop_front();
+ }
}
}