diff options
Diffstat (limited to 'ydb/core/persqueue/partition.cpp')
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 40 |
1 files changed, 20 insertions, 20 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index a238199b53..cc8e2419cf 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -20,7 +20,7 @@ #include <util/folder/path.h> #include <util/string/escape.h> #include <util/system/byteorder.h> - + #define VERIFY_RESULT_BLOB(blob, pos) \ Y_VERIFY(!blob.Data.empty(), "Empty data. SourceId: %s, SeqNo: %" PRIu64, blob.SourceId.data(), blob.SeqNo); \ Y_VERIFY(blob.SeqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, blob.SeqNo); @@ -456,10 +456,10 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co const TString& topicName, const TString& topicPath, const bool localDC, TString dcId, const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters, const TActorContext &ctx, bool newPartition) - : TabletID(tabletId) - , Partition(partition) + : TabletID(tabletId) + , Partition(partition) , Config(config) - , TopicName(topicName) + , TopicName(topicName) , TopicPath(topicPath) , LocalDC(localDC) , DCId(std::move(dcId)) @@ -557,9 +557,9 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo out << "MaxCurrently writing: " << MaxWriteResponsesSize; res.push_back(out.Str()); out.Clear(); out << "DataKeysBody size: " << DataKeysBody.size(); res.push_back(out.Str()); out.Clear(); for (ui32 i = 0; i < DataKeysHead.size(); ++i) { - out << "DataKeysHead[" << i << "] size: " << DataKeysHead[i].KeysCount() << " sum: " << DataKeysHead[i].Sum() + out << "DataKeysHead[" << i << "] size: " << DataKeysHead[i].KeysCount() << " sum: " << DataKeysHead[i].Sum() << " border: " << DataKeysHead[i].Border() << " recs: " << DataKeysHead[i].RecsCount() << " intCount: " << DataKeysHead[i].InternalPartsCount(); - res.push_back(out.Str()); out.Clear(); + res.push_back(out.Str()); out.Clear(); } for (auto& avg : AvgWriteBytes) { out << "AvgWriteSize per " << avg.GetDuration().ToString() << " is " << avg.GetValue() << " bytes"; @@ -1225,29 +1225,29 @@ void TPartition::Handle(NReadSpeedLimiterEvents::TEvCounters::TPtr& ev, const TA } } -void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) { - // Reply to all outstanding requests in order to destroy corresponding actors - +void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) { + // Reply to all outstanding requests in order to destroy corresponding actors + TStringBuilder ss; ss << "Tablet is restarting, topic '" << TopicName << "'"; for (const auto& ev : WaitToChangeOwner) { ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::INITIALIZING, ss); - } - + } + for (const auto& w : Requests) { ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::INITIALIZING, ss); - } - + } + for (const auto& wr : Responses) { ReplyError(ctx, wr.GetCookie(), NPersQueue::NErrorCode::INITIALIZING, TStringBuilder() << ss << " (WriteResponses)"); - } - - for (const auto& ri : ReadInfo) { + } + + for (const auto& ri : ReadInfo) { ReplyError(ctx, ri.second.Destination, NPersQueue::NErrorCode::INITIALIZING, TStringBuilder() << ss << " (ReadInfo) cookie " << ri.first); - } - + } + if (Mirrorer) { Send(Mirrorer->Actor, new TEvents::TEvPoisonPill()); } @@ -1564,7 +1564,7 @@ void TPartition::HandleDataRead(const NKikimrClient::TResponse& response, const LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "tablet " << TabletID << " HandleOnInit topic '" << TopicName << "' partition " << Partition << " ReadResult " << i << " status NKikimrProto::ERROR result message: \"" << read.GetMessage() << " \" errorReason: \"" << response.GetErrorReason() << "\""); - ctx.Send(Tablet, new TEvents::TEvPoisonPill()); + ctx.Send(Tablet, new TEvents::TEvPoisonPill()); return; default: Cerr << "ERROR " << read.GetStatus() << " message: \"" << read.GetMessage() << "\"\n"; @@ -1784,7 +1784,7 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { void TPartition::Handle(TEvPQ::TEvChangeConfig::TPtr& ev, const TActorContext& ctx) { Config = ev->Get()->Config; - TopicName = ev->Get()->TopicName; + TopicName = ev->Get()->TopicName; Y_VERIFY(Config.GetPartitionConfig().GetTotalPartitions() > 0); |