aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-04-18 18:40:21 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-04-18 18:40:21 +0300
commit588fc4b8dc6cde4e910a512034844b3166341b6b (patch)
tree87ee5694964df100ffdb0eeea873b755026f7718
parentdf455b910e1d049369547fe5c4e83417eb7a73a1 (diff)
downloadydb-588fc4b8dc6cde4e910a512034844b3166341b6b.tar.gz
merge: enable logs in mirrorer and some fixes
do not override log formatter inside ydb_persqueue sdk fix fix REVIEW: 2435689 more information from mirrorer & fix commit problem LOGBROKER-7400 1) исправлена проблема коммита офсета - коммит делается через оригинальное вычитанное сообщение 2) отслеживается самая старая фьюча по которой нет ответа 3) правильная обработка WaitNextReaderEventInFlight 4) постоянно запрашивается partition status REVIEW: 2435480 enable sdk logs in mirrorer LOGBROKER-7400 enable logs REVIEW: 2432795 REVIEW: 2438061 x-ydb-stable-ref: 1f12597d5956d27d42acad88a0e9d13e9e0b4bff
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp16
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.h7
-rw-r--r--ydb/core/driver_lib/run/run.cpp14
-rw-r--r--ydb/core/persqueue/actor_persqueue_client_iface.h43
-rw-r--r--ydb/core/persqueue/events/internal.h4
-rw-r--r--ydb/core/persqueue/mirrorer.cpp75
-rw-r--r--ydb/core/persqueue/mirrorer.h20
-rw-r--r--ydb/core/persqueue/mirrorer_ut.cpp2
-rw-r--r--ydb/core/persqueue/ya.make1
-rw-r--r--ydb/core/yq/libs/shared_resources/shared_resources.cpp64
-rw-r--r--ydb/core/yq/libs/shared_resources/ya.make1
-rw-r--r--ydb/library/logger/actor.cpp47
-rw-r--r--ydb/library/logger/actor.h32
-rw-r--r--ydb/library/logger/ya.make14
-rw-r--r--ydb/library/ya.make1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp142
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h10
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp133
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp2
21 files changed, 402 insertions, 230 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 45dc30fa75..aef92dc043 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -1878,22 +1878,6 @@ void TPersQueueClusterTrackerInitializer::InitializeServices(NActors::TActorSyst
TActorSetupCmd(actor, TMailboxType::HTSwap, appData->UserPoolId)));
}
-// TPersQueueLibSharedInstanceInitializer
-
-TPersQueueLibSharedInstanceInitializer::TPersQueueLibSharedInstanceInitializer(const TKikimrRunConfig& runConfig)
- : IKikimrServicesInitializer(runConfig)
-{}
-
-void TPersQueueLibSharedInstanceInitializer::InitializeServices(NActors::TActorSystemSetup*, const NKikimr::TAppData* appData) {
- if (Config.HasPQConfig() && Config.GetPQConfig().GetEnabled()) {
- if (Config.GetPQConfig().GetMirrorConfig().GetEnabled()) {
- if (appData->PersQueueMirrorReaderFactory) {
- appData->PersQueueMirrorReaderFactory->Initialize(Config.GetPQConfig().GetMirrorConfig().GetPQLibSettings());
- }
- }
- }
-}
-
// TMemProfMonitorInitializer
TMemProfMonitorInitializer::TMemProfMonitorInitializer(const TKikimrRunConfig& runConfig)
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h
index 6265bcc03b..b3ac9b9cbe 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.h
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h
@@ -353,13 +353,6 @@ public:
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};
-class TPersQueueLibSharedInstanceInitializer : public IKikimrServicesInitializer {
-public:
- TPersQueueLibSharedInstanceInitializer(const TKikimrRunConfig& runConfig);
-
- void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
-};
-
class TMemProfMonitorInitializer : public IKikimrServicesInitializer {
public:
TMemProfMonitorInitializer(const TKikimrRunConfig& runConfig);
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index c88e7b8fdc..7553c4cfe7 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -1128,6 +1128,18 @@ void TKikimrRunner::InitializeActorSystem(
if (YqSharedResources) {
YqSharedResources->Init(ActorSystem.Get());
}
+
+ if (runConfig.AppConfig.HasPQConfig()) {
+ const auto& pqConfig = runConfig.AppConfig.GetPQConfig();
+ if (pqConfig.GetEnabled() && pqConfig.GetMirrorConfig().GetEnabled()) {
+ if (AppData->PersQueueMirrorReaderFactory) {
+ AppData->PersQueueMirrorReaderFactory->Initialize(
+ ActorSystem.Get(),
+ pqConfig.GetMirrorConfig().GetPQLibSettings()
+ );
+ }
+ }
+ }
}
TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializersList(
@@ -1263,8 +1275,6 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TPersQueueClusterTrackerInitializer(runConfig));
}
- sil->AddServiceInitializer(new TPersQueueLibSharedInstanceInitializer(runConfig));
-
sil->AddServiceInitializer(new TMemProfMonitorInitializer(runConfig));
#if defined(ENABLE_MEMORY_TRACKING)
diff --git a/ydb/core/persqueue/actor_persqueue_client_iface.h b/ydb/core/persqueue/actor_persqueue_client_iface.h
index 2f8a8d4ab8..1bf0ed9a4c 100644
--- a/ydb/core/persqueue/actor_persqueue_client_iface.h
+++ b/ydb/core/persqueue/actor_persqueue_client_iface.h
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/core/protos/pqconfig.pb.h>
+#include <ydb/library/logger/actor.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
@@ -14,7 +15,21 @@ namespace NKikimr::NPQ {
class IPersQueueMirrorReaderFactory {
public:
- virtual void Initialize(const NKikimrPQ::TPQConfig::TPQLibSettings& settings) const = 0;
+ IPersQueueMirrorReaderFactory()
+ : ActorSystemPtr(std::make_shared<TDeferredActorLogBackend::TAtomicActorSystemPtr>(nullptr))
+ {}
+
+ virtual void Initialize(
+ NActors::TActorSystem* actorSystem,
+ const NKikimrPQ::TPQConfig::TPQLibSettings& settings
+ ) const {
+ Y_VERIFY(!ActorSystemPtr->load(std::memory_order_relaxed), "Double init");
+ ActorSystemPtr->store(actorSystem, std::memory_order_relaxed);
+
+ auto driverConfig = NYdb::TDriverConfig()
+ .SetNetworkThreadsNum(settings.GetThreadsCount());
+ Driver = std::make_shared<NYdb::TDriver>(driverConfig);
+ }
virtual std::shared_ptr<NYdb::ICredentialsProviderFactory> GetCredentialsProvider(
const NKikimrPQ::TMirrorPartitionConfig::TCredentials& cred
@@ -24,20 +39,23 @@ public:
const NKikimrPQ::TMirrorPartitionConfig& config,
ui32 partition,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
- ui64 maxMemoryUsageBytes
+ ui64 maxMemoryUsageBytes,
+ TMaybe<TLog> logger = Nothing()
) const = 0;
virtual ~IPersQueueMirrorReaderFactory() = default;
+
+ TDeferredActorLogBackend::TSharedAtomicActorSystemPtr GetSharedActorSystem() const {
+ return ActorSystemPtr;
+ }
+
+protected:
+ mutable TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr;
+ mutable std::shared_ptr<NYdb::TDriver> Driver;
};
class TPersQueueMirrorReaderFactory : public IPersQueueMirrorReaderFactory {
public:
- void Initialize(const NKikimrPQ::TPQConfig::TPQLibSettings& settings) const override {
- auto driverConfig = NYdb::TDriverConfig()
- .SetNetworkThreadsNum(settings.GetThreadsCount());
- Driver = std::make_shared<NYdb::TDriver>(driverConfig);
- }
-
std::shared_ptr<NYdb::ICredentialsProviderFactory> GetCredentialsProvider(
const NKikimrPQ::TMirrorPartitionConfig::TCredentials& cred
) const override {
@@ -55,7 +73,8 @@ public:
const NKikimrPQ::TMirrorPartitionConfig& config,
ui32 partition,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
- ui64 maxMemoryUsageBytes
+ ui64 maxMemoryUsageBytes,
+ TMaybe<TLog> logger = Nothing()
) const override {
NYdb::NPersQueue::TPersQueueClientSettings clientSettings = NYdb::NPersQueue::TPersQueueClientSettings()
.DiscoveryEndpoint(TStringBuilder() << config.GetEndpoint() << ":" << config.GetEndpointPort())
@@ -72,6 +91,9 @@ public:
.DisableClusterDiscovery(true)
.ReadOnlyOriginal(true)
.RetryPolicy(NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy());
+ if (logger) {
+ settings.Log(logger.GetRef());
+ }
if (config.HasReadFromTimestampsMs()) {
settings.StartingMessageTimestamp(TInstant::MilliSeconds(config.GetReadFromTimestampsMs()));
}
@@ -82,9 +104,6 @@ public:
NYdb::NPersQueue::TPersQueueClient persQueueClient(*Driver, clientSettings);
return persQueueClient.CreateReadSession(settings);
}
-
-private:
- mutable std::shared_ptr<NYdb::TDriver> Driver;
};
} // namespace NKikimr::NSQS
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index ced72293ac..de9b74e75b 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -618,8 +618,10 @@ struct TEvPQ {
};
struct TEvReaderEventArrived : public TEventLocal<TEvReaderEventArrived, EvReaderEventArrived> {
- TEvReaderEventArrived()
+ TEvReaderEventArrived(ui64 id) : Id(id)
{}
+
+ ui64 Id;
};
};
diff --git a/ydb/core/persqueue/mirrorer.cpp b/ydb/core/persqueue/mirrorer.cpp
index 37aea95ad1..9873e3664a 100644
--- a/ydb/core/persqueue/mirrorer.cpp
+++ b/ydb/core/persqueue/mirrorer.cpp
@@ -167,11 +167,9 @@ void TMirrorer::ProcessWriteResponse(
) {
Y_VERIFY_S(response.CmdWriteResultSize() == WriteInFlight.size(), MirrorerDescription()
<< "CmdWriteResultSize=" << response.CmdWriteResultSize() << ", but expected=" << WriteInFlight.size()
- << ". First expected offset= " << (WriteInFlight.empty() ? -1 : WriteInFlight.front().Offset)
+ << ". First expected offset= " << (WriteInFlight.empty() ? -1 : WriteInFlight.front().GetOffset(0))
<< " response: " << response);
-
- NYdb::NPersQueue::TDeferredCommit deferredCommit;
for (auto& result : response.GetCmdWriteResult()) {
if (result.GetAlreadyWritten()) {
Y_VERIFY_S(
@@ -185,22 +183,19 @@ void TMirrorer::ProcessWriteResponse(
}
auto& writtenMessageInfo = WriteInFlight.front();
if (MirrorerTimeLags) {
- TDuration lag = TInstant::MilliSeconds(result.GetWriteTimestampMS()) - writtenMessageInfo.WriteTime;
+ TDuration lag = TInstant::MilliSeconds(result.GetWriteTimestampMS()) - writtenMessageInfo.GetWriteTime(0);
MirrorerTimeLags->IncFor(lag.MilliSeconds(), 1);
}
- ui64 offset = writtenMessageInfo.Offset;
+ ui64 offset = writtenMessageInfo.GetOffset(0);
Y_VERIFY((ui64)result.GetOffset() == offset);
Y_VERIFY_S(EndOffset <= offset, MirrorerDescription()
<< "end offset more the written " << EndOffset << ">" << offset);
EndOffset = offset + 1;
- BytesInFlight -= writtenMessageInfo.Size;
+ BytesInFlight -= writtenMessageInfo.GetData().size();
- if (PartitionStream) {
- deferredCommit.Add(PartitionStream, offset);
- }
+ WriteInFlight.front().Commit();
WriteInFlight.pop_front();
}
- deferredCommit.Commit();
AfterSuccesWrite(ctx);
}
@@ -266,6 +261,17 @@ void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorConte
<< "[STATE] bytes in flight " << BytesInFlight
<< ", messages in write request " << WriteInFlight.size()
<< ", queue to write: " << Queue.size());
+ LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription()
+ << "[STATE] read futures inflight " << ReadFuturesInFlight << ", last id=" << ReadFeatureId);
+ if (!ReadFeatures.empty()) {
+ const auto& oldest = *ReadFeatures.begin();
+ const auto& info = oldest.second;
+ LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription()
+ << "[STATE] The oldest read future id=" << oldest.first
+ << ", ts=" << info.first << " age=" << (ctx.Now() - info.first)
+ << ", future state: " << info.second.Initialized()
+ << "/" << info.second.HasValue() << "/" << info.second.HasException());
+ }
}
if (!ReadSession && LastInitStageTimestamp + INIT_TIMEOUT < ctx.Now()) {
ProcessError(ctx, TStringBuilder() << "read session was not created, the last stage of initialization was at "
@@ -319,9 +325,7 @@ void TMirrorer::TryToWrite(const TActorContext& ctx) {
req->SetCookie(WRITE_REQUEST_COOKIE);
while (!Queue.empty() && AddToWriteRequest(*req, Queue.front())) {
- auto& message = Queue.front();
- ui64 dataSize = message.GetData().size();
- WriteInFlight.emplace_back(message.GetOffset(0), dataSize, message.GetWriteTime(0));
+ WriteInFlight.emplace_back(std::move(Queue.front()));
Queue.pop_front();
}
@@ -386,7 +390,17 @@ void TMirrorer::CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr&, const TActorCont
auto factory = AppData(ctx)->PersQueueMirrorReaderFactory;
Y_VERIFY(factory);
- ReadSession = factory->GetReadSession(Config, Partition, CredentialsProvider, MAX_BYTES_IN_FLIGHT);
+ TLog log(MakeHolder<TDeferredActorLogBackend>(
+ factory->GetSharedActorSystem(),
+ NKikimrServices::PQ_MIRRORER
+ ));
+
+ TString logPrefix = TStringBuilder() << MirrorerDescription() << "[reader " << ++ReaderGeneration << "] ";
+ log.SetFormatter([logPrefix](ELogPriority, TStringBuf message) -> TString {
+ return logPrefix + message;
+ });
+
+ ReadSession = factory->GetReadSession(Config, Partition, CredentialsProvider, MAX_BYTES_IN_FLIGHT, log);
LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription()
<< " read session created: " << ReadSession->GetSessionId());
@@ -400,13 +414,13 @@ void TMirrorer::RequestSourcePartitionStatus(TEvPQ::TEvRequestPartitionStatus::T
}
void TMirrorer::RequestSourcePartitionStatus() {
- if (Config.GetSyncWriteTime() && PartitionStream && ReadSession) {
+ if (PartitionStream && ReadSession) {
PartitionStream->RequestStatus();
}
}
void TMirrorer::TryUpdateWriteTimetsamp(const TActorContext &ctx) {
- if (!WriteRequestInFlight && StreamStatus && EndOffset == StreamStatus->GetEndOffset()) {
+ if (Config.GetSyncWriteTime() && !WriteRequestInFlight && StreamStatus && EndOffset == StreamStatus->GetEndOffset()) {
LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription()
<< " update write timestamp from original topic: " << StreamStatus->DebugString());
THolder<TEvPersQueue::TEvRequest> request = MakeHolder<TEvPersQueue::TEvRequest>();
@@ -438,6 +452,10 @@ void TMirrorer::ScheduleConsumerCreation(const TActorContext& ctx) {
LastInitStageTimestamp = ctx.Now();
ReadSession = nullptr;
PartitionStream = nullptr;
+ ReadFuturesInFlight = 0;
+ ReadFeatures.clear();
+ WaitNextReaderEventInFlight = false;
+
Become(&TThis::StateInitConsumer);
LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " schedule consumer creation");
@@ -469,18 +487,33 @@ void TMirrorer::StartWaitNextReaderEvent(const TActorContext& ctx) {
if (WaitNextReaderEventInFlight) {
return;
}
+ WaitNextReaderEventInFlight = true;
+
+ ui64 futureId = ++ReadFeatureId;
+ ++ReadFuturesInFlight;
auto future = ReadSession->WaitEvent();
+
future.Subscribe(
[
actorSystem = ctx.ExecutorThread.ActorSystem,
- selfId = SelfId()
+ selfId = SelfId(),
+ futureId=futureId
](const NThreading::TFuture<void>&) {
- actorSystem->Send(new NActors::IEventHandle(selfId, selfId, new TEvPQ::TEvReaderEventArrived()));
+ actorSystem->Send(new NActors::IEventHandle(selfId, selfId, new TEvPQ::TEvReaderEventArrived(futureId)));
}
);
+
+ if (ReadFeatures.size() < MAX_READ_FUTURES_STORE) {
+ ReadFeatures[futureId] = {ctx.Now(), future};
+ }
}
-void TMirrorer::ProcessNextReaderEvent(TEvPQ::TEvReaderEventArrived::TPtr&, const TActorContext& ctx) {
+void TMirrorer::ProcessNextReaderEvent(TEvPQ::TEvReaderEventArrived::TPtr& ev, const TActorContext& ctx) {
+ {
+ --ReadFuturesInFlight;
+ ReadFeatures.erase(ev->Get()->Id);
+ }
+
TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = ReadSession->GetEvent(false);
WaitNextReaderEventInFlight = false;
@@ -547,6 +580,10 @@ void TMirrorer::ProcessNextReaderEvent(TEvPQ::TEvReaderEventArrived::TPtr&, cons
ProcessError(ctx, TStringBuilder() << " read session closed: " << closeSessionEvent->DebugString());
ScheduleConsumerCreation(ctx);
return;
+ } else {
+ ProcessError(ctx, TStringBuilder() << " got unmatched event: " << event.GetRef().index());
+ ScheduleConsumerCreation(ctx);
+ return;
}
Send(SelfId(), new TEvents::TEvWakeup());
diff --git a/ydb/core/persqueue/mirrorer.h b/ydb/core/persqueue/mirrorer.h
index 6819891f6c..ff1fb9671b 100644
--- a/ydb/core/persqueue/mirrorer.h
+++ b/ydb/core/persqueue/mirrorer.h
@@ -19,6 +19,7 @@ namespace NPQ {
class TMirrorer : public TActorBootstrapped<TMirrorer> {
private:
+ const ui64 MAX_READ_FUTURES_STORE = 25;
const ui64 MAX_BYTES_IN_FLIGHT = 8 * 1024 * 1024;
const TDuration WRITE_RETRY_TIMEOUT_MAX = TDuration::Seconds(1);
const TDuration WRITE_RETRY_TIMEOUT_START = TDuration::MilliSeconds(1);
@@ -45,18 +46,6 @@ private:
UPDATE_WRITE_TIMESTAMP = 2
};
- struct TMessageInfo {
- ui64 Offset;
- ui64 Size;
- TInstant WriteTime;
-
- TMessageInfo(ui64 offset, ui64 size, TInstant writeTime)
- : Offset(offset)
- , Size(size)
- , WriteTime(writeTime)
- {}
- };
-
private:
STFUNC(StateInitConsumer)
@@ -167,13 +156,14 @@ private:
NKikimrPQ::TMirrorPartitionConfig Config;
TDeque<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage> Queue;
- TDeque<TMessageInfo> WriteInFlight;
+ TDeque<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage> WriteInFlight;
ui64 BytesInFlight = 0;
std::optional<NKikimrClient::TPersQueuePartitionRequest> WriteRequestInFlight;
TDuration WriteRetryTimeout = WRITE_RETRY_TIMEOUT_START;
TInstant WriteRequestTimestamp;
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProvider;
std::shared_ptr<NYdb::NPersQueue::IReadSession> ReadSession;
+ ui64 ReaderGeneration = 0;
NYdb::NPersQueue::TPartitionStream::TPtr PartitionStream;
THolder<NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamStatusEvent> StreamStatus;
TInstant LastInitStageTimestamp;
@@ -193,6 +183,10 @@ private:
TMultiCounter InitTimeoutCounter;
TMultiCounter WriteTimeoutCounter;
THolder<TPercentileCounter> MirrorerTimeLags;
+
+ TMap<ui64, std::pair<TInstant, NThreading::TFuture<void>>> ReadFeatures;
+ ui64 ReadFeatureId = 0;
+ ui64 ReadFuturesInFlight = 0;
};
}// NPQ
diff --git a/ydb/core/persqueue/mirrorer_ut.cpp b/ydb/core/persqueue/mirrorer_ut.cpp
index c1114a9958..800a89c491 100644
--- a/ydb/core/persqueue/mirrorer_ut.cpp
+++ b/ydb/core/persqueue/mirrorer_ut.cpp
@@ -14,7 +14,7 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {
const auto& settings = server.CleverServer->GetRuntime()->GetAppData().PQConfig.GetMirrorConfig().GetPQLibSettings();
auto fabric = std::make_shared<NKikimr::NPQ::TPersQueueMirrorReaderFactory>();
- fabric->Initialize(settings);
+ fabric->Initialize(server.CleverServer->GetRuntime()->GetAnyNodeActorSystem(), settings);
for (ui32 nodeId = 0; nodeId < server.CleverServer->GetRuntime()->GetNodeCount(); ++nodeId) {
server.CleverServer->GetRuntime()->GetAppData(nodeId).PersQueueMirrorReaderFactory = fabric.get();
}
diff --git a/ydb/core/persqueue/ya.make b/ydb/core/persqueue/ya.make
index 50e89676ef..e8acc79a80 100644
--- a/ydb/core/persqueue/ya.make
+++ b/ydb/core/persqueue/ya.make
@@ -46,6 +46,7 @@ PEERDIR(
ydb/core/persqueue/partition_key_range
ydb/core/persqueue/writer
ydb/core/protos
+ ydb/library/logger
ydb/library/persqueue/counter_time_keeper
ydb/library/persqueue/topic_parser
ydb/public/lib/base
diff --git a/ydb/core/yq/libs/shared_resources/shared_resources.cpp b/ydb/core/yq/libs/shared_resources/shared_resources.cpp
index 3dfd566232..54f5488249 100644
--- a/ydb/core/yq/libs/shared_resources/shared_resources.cpp
+++ b/ydb/core/yq/libs/shared_resources/shared_resources.cpp
@@ -2,11 +2,9 @@
#include <ydb/core/protos/services.pb.h>
#include <ydb/core/yq/libs/events/events.h>
+#include <ydb/library/logger/actor.h>
#include <library/cpp/actors/core/actorsystem.h>
-#include <library/cpp/actors/core/log.h>
-#include <library/cpp/logger/backend.h>
-#include <library/cpp/logger/record.h>
#include <util/generic/cast.h>
#include <util/generic/strbuf.h>
@@ -23,64 +21,8 @@ namespace NYq {
namespace {
-// Log backend that allows us to create shared YDB driver early (before actor system starts),
-// but log to actor system.
-class TDeferredActorSystemPtrInitActorLogBackend : public TLogBackend {
-public:
- using TAtomicActorSystemPtr = std::atomic<NActors::TActorSystem*>;
- using TSharedAtomicActorSystemPtr = std::shared_ptr<TAtomicActorSystemPtr>;
-
- TDeferredActorSystemPtrInitActorLogBackend(TSharedAtomicActorSystemPtr actorSystem, int logComponent)
- : ActorSystemPtr(std::move(actorSystem))
- , LogComponent(logComponent)
- {
- }
-
- NActors::NLog::EPriority GetActorLogPriority(ELogPriority priority) {
- switch (priority) {
- case TLOG_EMERG:
- return NActors::NLog::PRI_EMERG;
- case TLOG_ALERT:
- return NActors::NLog::PRI_ALERT;
- case TLOG_CRIT:
- return NActors::NLog::PRI_CRIT;
- case TLOG_ERR:
- return NActors::NLog::PRI_ERROR;
- case TLOG_WARNING:
- return NActors::NLog::PRI_WARN;
- case TLOG_NOTICE:
- return NActors::NLog::PRI_NOTICE;
- case TLOG_INFO:
- return NActors::NLog::PRI_INFO;
- case TLOG_DEBUG:
- return NActors::NLog::PRI_DEBUG;
- default:
- return NActors::NLog::PRI_TRACE;
- }
- }
-
- void WriteData(const TLogRecord& rec) override {
- NActors::TActorSystem* actorSystem = ActorSystemPtr->load(std::memory_order_relaxed);
- if (Y_LIKELY(actorSystem)) {
- LOG_LOG(*actorSystem, GetActorLogPriority(rec.Priority), LogComponent, TString(rec.Data, rec.Len));
- } else {
- // Not inited. Temporary write to stderr.
- TStringBuilder out;
- out << TStringBuf(rec.Data, rec.Len) << Endl;
- Cerr << out;
- }
- }
-
- void ReopenLog() override {
- }
-
-protected:
- TSharedAtomicActorSystemPtr ActorSystemPtr;
- const int LogComponent;
-};
-
struct TActorSystemPtrMixin {
- TDeferredActorSystemPtrInitActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr = std::make_shared<TDeferredActorSystemPtrInitActorLogBackend::TAtomicActorSystemPtr>(nullptr);
+ NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr = std::make_shared<NKikimr::TDeferredActorLogBackend::TAtomicActorSystemPtr>(nullptr);
};
struct TYqSharedResourcesImpl : public TActorSystemPtrMixin, public TYqSharedResources {
@@ -114,7 +56,7 @@ struct TYqSharedResourcesImpl : public TActorSystemPtrMixin, public TYqSharedRes
cfg.SetGrpcMemoryQuota(config.GetGrpcMemoryQuota());
}
cfg.SetDiscoveryMode(NYdb::EDiscoveryMode::Async); // We are in actor system!
- cfg.SetLog(MakeHolder<TDeferredActorSystemPtrInitActorLogBackend>(ActorSystemPtr, NKikimrServices::EServiceKikimr::YDB_SDK));
+ cfg.SetLog(MakeHolder<NKikimr::TDeferredActorLogBackend>(ActorSystemPtr, NKikimrServices::EServiceKikimr::YDB_SDK));
return cfg;
}
diff --git a/ydb/core/yq/libs/shared_resources/ya.make b/ydb/core/yq/libs/shared_resources/ya.make
index 46c9a3bb2b..6b1eccd910 100644
--- a/ydb/core/yq/libs/shared_resources/ya.make
+++ b/ydb/core/yq/libs/shared_resources/ya.make
@@ -13,6 +13,7 @@ PEERDIR(
ydb/core/protos
ydb/core/yq/libs/events
ydb/core/yq/libs/shared_resources/interface
+ ydb/library/logger
ydb/library/security
ydb/public/sdk/cpp/client/ydb_driver
ydb/public/sdk/cpp/client/ydb_table
diff --git a/ydb/library/logger/actor.cpp b/ydb/library/logger/actor.cpp
new file mode 100644
index 0000000000..5c7f244f6e
--- /dev/null
+++ b/ydb/library/logger/actor.cpp
@@ -0,0 +1,47 @@
+#include "actor.h"
+
+namespace NKikimr {
+
+ TDeferredActorLogBackend::TDeferredActorLogBackend(TSharedAtomicActorSystemPtr actorSystem, int logComponent)
+ : ActorSystemPtr(std::move(actorSystem))
+ , LogComponent(logComponent)
+ {
+ }
+
+ NActors::NLog::EPriority TDeferredActorLogBackend::GetActorLogPriority(ELogPriority priority) const {
+ switch (priority) {
+ case TLOG_EMERG:
+ return NActors::NLog::PRI_EMERG;
+ case TLOG_ALERT:
+ return NActors::NLog::PRI_ALERT;
+ case TLOG_CRIT:
+ return NActors::NLog::PRI_CRIT;
+ case TLOG_ERR:
+ return NActors::NLog::PRI_ERROR;
+ case TLOG_WARNING:
+ return NActors::NLog::PRI_WARN;
+ case TLOG_NOTICE:
+ return NActors::NLog::PRI_NOTICE;
+ case TLOG_INFO:
+ return NActors::NLog::PRI_INFO;
+ case TLOG_DEBUG:
+ return NActors::NLog::PRI_DEBUG;
+ default:
+ return NActors::NLog::PRI_TRACE;
+ }
+ }
+
+ void TDeferredActorLogBackend::WriteData(const TLogRecord& rec) {
+ NActors::TActorSystem* actorSystem = ActorSystemPtr->load(std::memory_order_relaxed);
+ if (Y_LIKELY(actorSystem)) {
+ LOG_LOG(*actorSystem, GetActorLogPriority(rec.Priority), LogComponent, TString(rec.Data, rec.Len));
+ } else {
+ // Not inited. Temporary write to stderr.
+ TStringBuilder out;
+ out << TStringBuf(rec.Data, rec.Len) << Endl;
+ Cerr << out;
+ }
+ }
+
+} // NKikimr
+
diff --git a/ydb/library/logger/actor.h b/ydb/library/logger/actor.h
new file mode 100644
index 0000000000..bccf2cbdfe
--- /dev/null
+++ b/ydb/library/logger/actor.h
@@ -0,0 +1,32 @@
+#pragma once
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/log.h>
+#include <library/cpp/logger/backend.h>
+#include <library/cpp/logger/record.h>
+
+
+namespace NKikimr {
+
+// Log backend that allows us to create shared YDB driver early (before actor system starts),
+// but log to actor system.
+class TDeferredActorLogBackend : public TLogBackend {
+public:
+ using TAtomicActorSystemPtr = std::atomic<NActors::TActorSystem*>;
+ using TSharedAtomicActorSystemPtr = std::shared_ptr<TAtomicActorSystemPtr>;
+
+ TDeferredActorLogBackend(TSharedAtomicActorSystemPtr actorSystem, int logComponent);
+
+ NActors::NLog::EPriority GetActorLogPriority(ELogPriority priority) const;
+
+ void WriteData(const TLogRecord& rec) override;
+
+ void ReopenLog() override {}
+
+protected:
+ TSharedAtomicActorSystemPtr ActorSystemPtr;
+ const int LogComponent;
+};
+
+} // NKikimr
+
diff --git a/ydb/library/logger/ya.make b/ydb/library/logger/ya.make
new file mode 100644
index 0000000000..4eb705eac0
--- /dev/null
+++ b/ydb/library/logger/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+OWNER(g:kikimr)
+
+PEERDIR(
+ library/cpp/actors/core
+ library/cpp/logger
+)
+
+SRCS(
+ actor.cpp
+)
+
+END()
diff --git a/ydb/library/ya.make b/ydb/library/ya.make
index 2fb80851bb..7cf5bf8d66 100644
--- a/ydb/library/ya.make
+++ b/ydb/library/ya.make
@@ -9,6 +9,7 @@ RECURSE(
http_proxy
keys
login
+ logger
mkql_proto
naming_conventions
pdisk_io
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
index 3b82d7ec0d..d44a85e527 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
@@ -51,13 +51,17 @@ private:
std::weak_ptr<TReadSession> Session;
};
+TStringBuilder TReadSession::GetLogPrefix() const {
+ return TStringBuilder() << GetDatabaseLogPrefix(DbDriverState->Database) << "[" << SessionId << "] ";
+}
+
TReadSession::TReadSession(const TReadSessionSettings& settings,
std::shared_ptr<TPersQueueClient::TImpl> client,
std::shared_ptr<TGRpcConnectionsImpl> connections,
TDbDriverStatePtr dbDriverState)
: Settings(settings)
, SessionId(CreateGuidAsString())
- , Log(dbDriverState->Log)
+ , Log(settings.Log_.GetOrElse(dbDriverState->Log))
, Client(std::move(client))
, Connections(std::move(connections))
, DbDriverState(std::move(dbDriverState))
@@ -67,12 +71,6 @@ TReadSession::TReadSession(const TReadSessionSettings& settings,
}
MakeCountersIfNeeded();
-
- {
- TStringBuilder logPrefix;
- logPrefix << GetDatabaseLogPrefix(DbDriverState->Database) << "[" << SessionId << "] ";
- Log.SetFormatter(GetPrefixLogFormatter(logPrefix));
- }
}
TReadSession::~TReadSession() {
@@ -99,7 +97,7 @@ void TReadSession::Start() {
return;
}
- Log << TLOG_INFO << "Starting read session";
+ Log.Write(TLOG_INFO, GetLogPrefix() << "Starting read session");
if (Settings.DisableClusterDiscovery_) {
ProceedWithoutClusterDiscovery();
} else {
@@ -135,7 +133,7 @@ void TReadSession::StartClusterDiscovery() {
return;
}
- Log << TLOG_DEBUG << "Starting cluster discovery";
+ Log.Write(TLOG_DEBUG, GetLogPrefix() << "Starting cluster discovery");
ClusterDiscoveryDelayContext = nullptr;
}
@@ -196,22 +194,24 @@ void TReadSession::CreateClusterSessionsImpl() {
if (sessionSettings.MaxMemoryUsageBytes_ > clusterSessionsCount && sessionSettings.MaxMemoryUsageBytes_ != std::numeric_limits<size_t>::max()) {
sessionSettings.MaxMemoryUsageBytes_ /= clusterSessionsCount;
}
- Log << TLOG_DEBUG << "Starting session to cluster " << clusterName << " (" << clusterSessionInfo.ClusterEndpoint << ")";
+ Log.Write(
+ TLOG_DEBUG,
+ GetLogPrefix() << "Starting session to cluster " << clusterName
+ << " (" << clusterSessionInfo.ClusterEndpoint << ")"
+ );
auto subclient = Client->GetClientForEndpoint(clusterSessionInfo.ClusterEndpoint);
auto context = subclient->CreateContext();
if (!context) {
AbortImpl(EStatus::ABORTED, DRIVER_IS_STOPPING_DESCRIPTION, deferred);
return;
}
- TStringBuilder logPrefix;
- logPrefix << GetDatabaseLogPrefix(DbDriverState->Database) << "[" << SessionId << "] [" << clusterName << "] ";
- TLog log = Log;
- log.SetFormatter(GetPrefixLogFormatter(logPrefix));
clusterSessionInfo.Session =
std::make_shared<TSingleClusterReadSessionImpl>(
sessionSettings,
+ DbDriverState->Database,
+ SessionId,
clusterName,
- log,
+ Log,
subclient->CreateReadSessionConnectionProcessorFactory(),
EventsQueue,
ErrorHandler,
@@ -235,8 +235,11 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu
}
TMaybe<TDuration> retryDelay = ClusterDiscoveryRetryState->GetNextRetryDelay(status.GetStatus());
if (retryDelay) {
- Log << TLOG_INFO << "Cluster discovery request failed. Status: " << status.GetStatus()
- << ". Issues: \"" << IssuesSingleLineString(status.GetIssues()) << "\"";
+ Log.Write(
+ TLOG_INFO,
+ GetLogPrefix() << "Cluster discovery request failed. Status: " << status.GetStatus()
+ << ". Issues: \"" << IssuesSingleLineString(status.GetIssues()) << "\""
+ );
RestartClusterDiscoveryImpl(*retryDelay, deferred);
} else {
AbortImpl(status.GetStatus(), MakeIssueWithSubIssues("Failed to discover clusters", status.GetIssues()), deferred);
@@ -244,7 +247,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu
return;
}
- Log << TLOG_DEBUG << "Cluster discovery request succeeded";
+ Log.Write(TLOG_DEBUG, GetLogPrefix() << "Cluster discovery request succeeded");
ClusterDiscoveryRetryState = nullptr;
// Init ClusterSessions.
@@ -319,7 +322,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu
}
void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions& deferred) {
- Log << TLOG_DEBUG << "Restart cluster discovery in " << delay;
+ Log.Write(TLOG_DEBUG, GetLogPrefix() << "Restart cluster discovery in " << delay);
auto startCallback = [self = weak_from_this()](bool ok) {
if (ok) {
if (auto sharedSelf = self.lock()) {
@@ -339,7 +342,7 @@ void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions
}
bool TReadSession::Close(TDuration timeout) {
- Log << TLOG_INFO << "Closing read session. Close timeout: " << timeout;
+ Log.Write(TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout);
with_lock (Lock) {
Cancel(ClusterDiscoveryDelayContext);
Cancel(DumpCountersContext);
@@ -424,7 +427,7 @@ bool TReadSession::Close(TDuration timeout) {
void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions& deferred) {
if (!Aborting) {
Aborting = true;
- Log << TLOG_NOTICE << "Aborting read session. Description: " << closeEvent.DebugString();
+ Log.Write(TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString());
Cancel(ClusterDiscoveryDelayContext);
Cancel(DumpCountersContext);
for (auto& [cluster, sessionInfo] : ClusterSessions) {
@@ -488,7 +491,7 @@ TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(bool block, size_t maxB
}
void TReadSession::StopReadingData() {
- Log << TLOG_INFO << "Stop reading data";
+ Log.Write(TLOG_INFO, GetLogPrefix() << "Stop reading data");
with_lock (Lock) {
if (!DataReadingSuspended) {
DataReadingSuspended = true;
@@ -503,7 +506,7 @@ void TReadSession::StopReadingData() {
}
void TReadSession::ResumeReadingData() {
- Log << TLOG_INFO << "Resume reading data";
+ Log.Write(TLOG_INFO, GetLogPrefix() << "Resume reading data");
with_lock (Lock) {
if (DataReadingSuspended) {
DataReadingSuspended = false;
@@ -530,7 +533,7 @@ static ELogPriority GetEventLogPriority(const TReadSessionEvent::TEvent& event)
}
void TReadSession::OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) {
- Log << GetEventLogPriority(event) << "Read session event " << DebugString(event);
+ Log.Write(GetEventLogPriority(event), GetLogPrefix() << "Read session event " << DebugString(event));
}
void TReadSession::MakeCountersIfNeeded() {
@@ -583,8 +586,8 @@ void TReadSession::DumpCountersToLog(size_t timeNumber) {
/**/
if (logCounters) {
- Log << TLOG_INFO
- << "Counters: {"
+ Log.Write(TLOG_INFO,
+ GetLogPrefix() << "Counters: {"
C(Errors)
C(CurrentSessionLifetimeMs)
C(BytesRead)
@@ -594,7 +597,8 @@ void TReadSession::DumpCountersToLog(size_t timeNumber) {
C(BytesInflightCompressed)
C(BytesInflightTotal)
C(MessagesInflight)
- << " }";
+ << " }"
+ );
}
#undef C
@@ -686,6 +690,10 @@ void TPartitionStreamImpl::SignalReadyEvents(TReadSessionEventsQueue* queue, TDe
}
}
+TStringBuilder TSingleClusterReadSessionImpl::GetLogPrefix() const {
+ return TStringBuilder() << GetDatabaseLogPrefix(Database) << "[" << SessionId << "] [" << ClusterName << "] ";
+}
+
void TSingleClusterReadSessionImpl::Start() {
Settings.DecompressionExecutor_->Start();
Settings.EventHandlers_.HandlersExecutor_->Start();
@@ -709,7 +717,11 @@ bool TSingleClusterReadSessionImpl::Reconnect(const TPlainStatus& status) {
NGrpc::IQueueClientContextPtr prevConnectDelayContext;
if (!status.Ok()) {
- Log << TLOG_INFO << "Got error. Status: " << status.Status << ". Description: " << IssuesSingleLineString(status.Issues);
+ Log.Write(
+ TLOG_INFO,
+ GetLogPrefix() << "Got error. Status: " << status.Status
+ << ". Description: " << IssuesSingleLineString(status.Issues)
+ );
}
TDeferredActions deferred;
@@ -731,7 +743,10 @@ bool TSingleClusterReadSessionImpl::Reconnect(const TPlainStatus& status) {
if (!delayContext) {
return false;
}
- Log << TLOG_DEBUG << "Reconnecting session to cluster " << ClusterName << " in "<< delay;
+ Log.Write(
+ TLOG_DEBUG,
+ GetLogPrefix() << "Reconnecting session to cluster " << ClusterName << " in "<< delay
+ );
} else {
return false;
}
@@ -788,7 +803,11 @@ bool TSingleClusterReadSessionImpl::Reconnect(const TPlainStatus& status) {
}
void TSingleClusterReadSessionImpl::BreakConnectionAndReconnectImpl(TPlainStatus&& status, TDeferredActions& deferred) {
- Log << TLOG_INFO << "Break connection due to unexpected message from server. Status: " << status.Status << ", Issues: \"" << IssuesSingleLineString(status.Issues) << "\"";
+ Log.Write(
+ TLOG_INFO,
+ GetLogPrefix() << "Break connection due to unexpected message from server. Status: " << status.Status
+ << ", Issues: \"" << IssuesSingleLineString(status.Issues) << "\""
+ );
Processor->Cancel();
Processor = nullptr;
@@ -861,7 +880,7 @@ void TSingleClusterReadSessionImpl::OnConnect(TPlainStatus&& st, typename IProce
}
void TSingleClusterReadSessionImpl::InitImpl(TDeferredActions& deferred) { // Assumes that we're under lock.
- Log << TLOG_DEBUG << "Successfully connected. Initializing session";
+ Log.Write(TLOG_DEBUG, GetLogPrefix() << "Successfully connected. Initializing session");
Ydb::PersQueue::V1::MigrationStreamingReadClientMessage req;
auto& init = *req.mutable_init_request();
init.set_ranges_mode(RangesMode);
@@ -917,14 +936,21 @@ void TSingleClusterReadSessionImpl::ConfirmPartitionStreamCreate(const TPartitio
if (commitOffset) {
commitOffsetLogStr << ". Commit offset: " << *commitOffset;
}
- Log << TLOG_INFO << "Confirm partition stream create. Partition stream id: " << partitionStream->GetPartitionStreamId()
- << ". Cluster: \"" << partitionStream->GetCluster() << "\". Topic: \"" << partitionStream->GetTopicPath()
- << "\". Partition: " << partitionStream->GetPartitionId()
- << ". Read offset: " << readOffset << commitOffsetLogStr;
+ Log.Write(
+ TLOG_INFO,
+ GetLogPrefix() << "Confirm partition stream create. Partition stream id: " << partitionStream->GetPartitionStreamId()
+ << ". Cluster: \"" << partitionStream->GetCluster() << "\". Topic: \"" << partitionStream->GetTopicPath()
+ << "\". Partition: " << partitionStream->GetPartitionId()
+ << ". Read offset: " << readOffset << commitOffsetLogStr
+ );
with_lock (Lock) {
if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation.
- Log << TLOG_DEBUG << "Skip partition stream create confirm. Partition stream id: " << partitionStream->GetPartitionStreamId();
+ Log.Write(
+ TLOG_DEBUG,
+ GetLogPrefix() << "Skip partition stream create confirm. Partition stream id: "
+ << partitionStream->GetPartitionStreamId()
+ );
return;
}
@@ -946,14 +972,22 @@ void TSingleClusterReadSessionImpl::ConfirmPartitionStreamCreate(const TPartitio
}
void TSingleClusterReadSessionImpl::ConfirmPartitionStreamDestroy(TPartitionStreamImpl* partitionStream) {
- Log << TLOG_INFO << "Confirm partition stream destroy. Partition stream id: " << partitionStream->GetPartitionStreamId()
- << ". Cluster: \"" << partitionStream->GetCluster() << "\". Topic: \"" << partitionStream->GetTopicPath()
- << "\". Partition: " << partitionStream->GetPartitionId();
+ Log.Write(
+ TLOG_INFO,
+ GetLogPrefix() << "Confirm partition stream destroy. Partition stream id: "
+ << partitionStream->GetPartitionStreamId()
+ << ". Cluster: \"" << partitionStream->GetCluster() << "\". Topic: \"" << partitionStream->GetTopicPath()
+ << "\". Partition: " << partitionStream->GetPartitionId()
+ );
TDeferredActions deferred;
with_lock (Lock) {
if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation.
- Log << TLOG_DEBUG << "Skip partition stream destroy confirm. Partition stream id: " << partitionStream->GetPartitionStreamId();
+ Log.Write(
+ TLOG_DEBUG,
+ GetLogPrefix() << "Skip partition stream destroy confirm. Partition stream id: "
+ << partitionStream->GetPartitionStreamId()
+ );
return;
}
@@ -973,7 +1007,11 @@ void TSingleClusterReadSessionImpl::ConfirmPartitionStreamDestroy(TPartitionStre
}
void TSingleClusterReadSessionImpl::Commit(const TPartitionStreamImpl* partitionStream, ui64 startOffset, ui64 endOffset) {
- Log << TLOG_DEBUG << "Commit offsets [" << startOffset << ", " << endOffset << "). Partition stream id: " << partitionStream->GetPartitionStreamId();
+ Log.Write(
+ TLOG_DEBUG,
+ GetLogPrefix() << "Commit offsets [" << startOffset << ", " << endOffset
+ << "). Partition stream id: " << partitionStream->GetPartitionStreamId()
+ );
with_lock (Lock) {
if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation.
return;
@@ -1004,7 +1042,10 @@ void TSingleClusterReadSessionImpl::Commit(const TPartitionStreamImpl* partition
}
void TSingleClusterReadSessionImpl::RequestPartitionStreamStatus(const TPartitionStreamImpl* partitionStream) {
- Log << TLOG_DEBUG << "Requesting status for partition stream id: " << partitionStream->GetPartitionStreamId();
+ Log.Write(
+ TLOG_DEBUG,
+ GetLogPrefix() << "Requesting status for partition stream id: " << partitionStream->GetPartitionStreamId()
+ );
with_lock (Lock) {
if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation.
return;
@@ -1022,7 +1063,7 @@ void TSingleClusterReadSessionImpl::RequestPartitionStreamStatus(const TPartitio
}
void TSingleClusterReadSessionImpl::OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) {
- Log << TLOG_DEBUG << "Read session event " << DebugString(event);
+ Log.Write(TLOG_DEBUG, GetLogPrefix() << "Read session event " << DebugString(event));
const i64 bytesCount = static_cast<i64>(CalcDataSize(event));
Y_ASSERT(bytesCount >= 0);
@@ -1142,7 +1183,7 @@ void TSingleClusterReadSessionImpl::OnReadDone(NGrpc::TGrpcStatus&& grpcStatus,
void TSingleClusterReadSessionImpl::OnReadDoneImpl(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::InitResponse&& msg, TDeferredActions& deferred) { // Assumes that we're under lock.
Y_UNUSED(deferred);
- Log << TLOG_INFO << "Server session id: " << msg.session_id();
+ Log.Write(TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id());
// Successful init. Do nothing.
ContinueReadingDataImpl();
@@ -1266,7 +1307,7 @@ void TSingleClusterReadSessionImpl::OnReadDoneImpl(Ydb::PersQueue::V1::Migration
void TSingleClusterReadSessionImpl::OnReadDoneImpl(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::Committed&& msg, TDeferredActions& deferred) { // Assumes that we're under lock.
- Log << TLOG_DEBUG << "Committed response: " << msg;
+ Log.Write(TLOG_DEBUG, GetLogPrefix() << "Committed response: " << msg);
TMap<ui64, TIntrusivePtr<TPartitionStreamImpl>> partitionStreams;
for (const Ydb::PersQueue::V1::CommitCookie& cookieProto : msg.cookies()) {
@@ -1367,7 +1408,7 @@ void TSingleClusterReadSessionImpl::OnDataDecompressed(i64 sourceSize, i64 estim
}
void TSingleClusterReadSessionImpl::Abort() {
- Log << TLOG_DEBUG << "Abort session to cluster";
+ Log.Write(TLOG_DEBUG, GetLogPrefix() << "Abort session to cluster");
with_lock (Lock) {
if (!Aborting) {
@@ -2097,10 +2138,10 @@ bool TDataDecompressionInfo::TakeData(const TIntrusivePtr<TPartitionStreamImpl>&
} while (CurrentReadingMessage.first < static_cast<size_t>(msg.batches_size()) && msg.batches(CurrentReadingMessage.first).message_data_size() == 0);
}
}
- partitionStream->GetLog() << TLOG_DEBUG << "Take Data. Partition " << partitionStream->GetPartitionId()
+ partitionStream->GetLog().Write(TLOG_DEBUG, TStringBuilder() << "Take Data. Partition " << partitionStream->GetPartitionId()
<< ". Read: {" << prevReadingMessage.first << ", " << prevReadingMessage.second << "} -> {"
<< CurrentReadingMessage.first << ", " << CurrentReadingMessage.second << "} ("
- << minOffset << "-" << maxOffset << ")";
+ << minOffset << "-" << maxOffset << ")");
return CurrentReadingMessage <= *readyThreshold;
}
@@ -2175,7 +2216,10 @@ void TDataDecompressionInfo::TDecompressionTask::operator()() {
}
}
if (auto session = Parent->Session.lock()) {
- session->GetLog() << TLOG_DEBUG << "Decompression task done. Partition: " << partition << " (" << minOffset << "-" << maxOffset << ")";
+ session->GetLog().Write(
+ TLOG_DEBUG,
+ TStringBuilder() << "Decompression task done. Partition: " << partition << " (" << minOffset << "-" << maxOffset << ")"
+ );
}
Y_ASSERT(dataProcessed == SourceDataSize);
std::shared_ptr<TSingleClusterReadSessionImpl> session = Parent->Session.lock();
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
index e08db09456..5a62afd7e3 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
@@ -759,6 +759,8 @@ public:
TSingleClusterReadSessionImpl(
const TReadSessionSettings& settings,
+ const TString& database,
+ const TString& sessionId,
const TString& clusterName,
const TLog& log,
std::shared_ptr<IReadSessionConnectionProcessorFactory> connectionFactory,
@@ -768,6 +770,8 @@ public:
ui64 partitionStreamIdStart, ui64 partitionStreamIdStep
)
: Settings(settings)
+ , Database(database)
+ , SessionId(sessionId)
, ClusterName(clusterName)
, Log(log)
, NextPartitionStreamId(partitionStreamIdStart)
@@ -808,6 +812,8 @@ public:
void DumpStatisticsToLog(TLogElement& log);
void UpdateMemoryUsageStatistics();
+ TStringBuilder GetLogPrefix() const;
+
const TLog& GetLog() const {
return Log;
}
@@ -956,6 +962,8 @@ private:
private:
const TReadSessionSettings Settings;
+ const TString Database;
+ const TString SessionId;
const TString ClusterName;
TLog Log;
ui64 NextPartitionStreamId;
@@ -1064,6 +1072,8 @@ public:
void ClearAllEvents();
private:
+ TStringBuilder GetLogPrefix() const;
+
// Start
bool ValidateSettings();
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
index 0a338cfc1d..1259126b66 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
@@ -85,10 +85,14 @@ void TWriteSession::Start(const TDuration& delay) {
TWriteSession::THandleResult TWriteSession::RestartImpl(const TPlainStatus& status) {
THandleResult result;
if (AtomicGet(Aborting)) {
- DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session is aborting and will not restart";
+ DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart");
return result;
}
- DbDriverState->Log << TLOG_INFO << LogPrefix() << "Got error. Status: " << status.Status << ". Description: " << IssuesSingleLineString(status.Issues);
+ DbDriverState->Log.Write(
+ TLOG_INFO,
+ LogPrefix() << "Got error. Status: " << status.Status
+ << ". Description: " << IssuesSingleLineString(status.Issues)
+ );
SessionEstablished = false;
TMaybe<TDuration> nextDelay = TDuration::Zero();
if (!RetryState) {
@@ -99,11 +103,14 @@ TWriteSession::THandleResult TWriteSession::RestartImpl(const TPlainStatus& stat
if (nextDelay) {
result.StartDelay = *nextDelay;
result.DoRestart = true;
- DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session will restart in " << result.StartDelay.MilliSeconds() << " ms";
+ DbDriverState->Log.Write(
+ TLOG_DEBUG,
+ LogPrefix() << "Write session will restart in " << result.StartDelay.MilliSeconds() << " ms"
+ );
ResetForRetryImpl();
} else {
- DbDriverState->Log << TLOG_INFO << LogPrefix() << "Write session will not restart after a fatal error";
+ DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error");
result.DoStop = true;
CheckHandleResultImpl(result);
}
@@ -116,7 +123,7 @@ bool IsFederation(const TString& endpoint) {
}
void TWriteSession::DoCdsRequest(TDuration delay) {
- DbDriverState->Log << TLOG_INFO << LogPrefix() << "Write session: Do CDS request";
+ DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: Do CDS request");
auto weakThis = weak_from_this();
if (
@@ -149,10 +156,10 @@ void TWriteSession::DoCdsRequest(TDuration delay) {
params->set_preferred_cluster_name(*Settings.PreferredCluster_);
auto weakConnections = std::weak_ptr<TGRpcConnectionsImpl>(Connections);
- DbDriverState->Log << TLOG_INFO << LogPrefix() << "Do schedule cds request after " << delay.MilliSeconds() << " ms\n";
+ DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Do schedule cds request after " << delay.MilliSeconds() << " ms\n");
auto cdsRequestCall = [req_=std::move(req), extr=std::move(extractor), weakConnections, dbState=DbDriverState, settings=Settings]() mutable {
if (auto connections = weakConnections.lock()) {
- dbState->Log << TLOG_INFO << "MessageGroupId [" << settings.MessageGroupId_ << "] Running cds request ms\n";
+ dbState->Log.Write(TLOG_INFO, TStringBuilder() << "MessageGroupId [" << settings.MessageGroupId_ << "] Running cds request ms\n");
connections->RunDeferred<Ydb::PersQueue::V1::ClusterDiscoveryService,
Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest,
Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse>(
@@ -171,7 +178,7 @@ void TWriteSession::DoCdsRequest(TDuration delay) {
void TWriteSession::OnCdsResponse(
TStatus& status, const Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult& result
) {
- DbDriverState->Log << TLOG_INFO << LogPrefix() << "Got CDS response: \n" << result.ShortDebugString();
+ DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Got CDS response: \n" << result.ShortDebugString());
TString endpoint, name;
THandleResult handleResult;
if (!status.IsSuccess()) {
@@ -266,7 +273,7 @@ void TWriteSession::InitWriter() { // No Lock, very initial start - no race yet
NThreading::TFuture<ui64> TWriteSession::GetInitSeqNo() {
if (Settings.ValidateSeqNo_) {
if (AutoSeqNoMode.Defined() && *AutoSeqNoMode) {
- DbDriverState->Log << TLOG_ERR << LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode";
+ DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode");
ThrowFatalError("Cannot call GetInitSeqNo in Auto SeqNo mode");
}
else
@@ -302,17 +309,23 @@ ui64 TWriteSession::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) {
}
if (seqNo.Defined()) {
if (*AutoSeqNoMode) {
- DbDriverState->Log << TLOG_ERR << LogPrefix() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode";
+ DbDriverState->Log.Write(
+ TLOG_ERR,
+ LogPrefix() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode"
+ );
ThrowFatalError(
- "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode"
+ "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode"
);
} else {
seqNoValue = *seqNo;
}
} else if (!(*AutoSeqNoMode)) {
- DbDriverState->Log << TLOG_ERR << LogPrefix() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode";
+ DbDriverState->Log.Write(
+ TLOG_ERR,
+ LogPrefix() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode"
+ );
ThrowFatalError(
- "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode"
+ "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode"
);
}
LastSeqNo = seqNoValue;
@@ -378,7 +391,7 @@ TWriteSession::THandleResult TWriteSession::OnErrorImpl(NYdb::TPlainStatus&& sta
// No lock
void TWriteSession::DoConnect(const TDuration& delay, const TString& endpoint) {
- DbDriverState->Log << TLOG_INFO << LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint;
+ DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint);
NGrpc::IQueueClientContextPtr prevConnectContext;
NGrpc::IQueueClientContextPtr prevConnectTimeoutContext;
@@ -457,7 +470,7 @@ void TWriteSession::DoConnect(const TDuration& delay, const TString& endpoint) {
// RPC callback.
void TWriteSession::OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext) {
- DbDriverState->Log << TLOG_ERR << LogPrefix() << "Write session: connect timeout";
+ DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Write session: connect timeout");
THandleResult handleResult;
with_lock (Lock) {
if (ConnectTimeoutContext == connectTimeoutContext) {
@@ -536,7 +549,7 @@ void TWriteSession::InitImpl() {
for (const auto& attr : Settings.Meta_.Fields) {
(*init->mutable_session_meta())[attr.first] = attr.second;
}
- DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session: send init request: "<< req.ShortDebugString();
+ DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString());
WriteToProcessorImpl(std::move(req));
}
@@ -635,7 +648,7 @@ void TWriteSession::OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectio
ProcessHandleResult(processResult.HandleResult);
}
-TString TWriteSession::LogPrefix() const {
+TStringBuilder TWriteSession::LogPrefix() const {
return TStringBuilder() << "MessageGroupId [" << Settings.MessageGroupId_ << "] SessionId [" << SessionId << "] ";
}
@@ -676,7 +689,7 @@ TWriteSession::TProcessSrvMessageResult TWriteSession::ProcessServerMessageImpl(
}
case TServerMessage::kInitResponse: {
const auto& initResponse = ServerMessage->init_response();
- DbDriverState->Log << TLOG_INFO << LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString();
+ DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString());
SessionId = initResponse.session_id();
PartitionId = initResponse.partition_id();
ui64 newLastSeqNo = initResponse.last_sequence_number();
@@ -702,7 +715,10 @@ TWriteSession::TProcessSrvMessageResult TWriteSession::ProcessServerMessageImpl(
case TServerMessage::kBatchWriteResponse: {
TWriteSessionEvent::TAcksEvent acksEvent;
const auto& batchWriteResponse = ServerMessage->batch_write_response();
- DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session got write response: " << batchWriteResponse.ShortDebugString();
+ DbDriverState->Log.Write(
+ TLOG_DEBUG,
+ LogPrefix() << "Write session got write response: " << batchWriteResponse.ShortDebugString()
+ );
TWriteStat::TPtr writeStat = new TWriteStat{};
const auto& stat = batchWriteResponse.write_statistics();
writeStat->WriteTime = TDuration::MilliSeconds(stat.persist_duration_ms());
@@ -735,7 +751,7 @@ TWriteSession::TProcessSrvMessageResult TWriteSession::ProcessServerMessageImpl(
}
case TServerMessage::kUpdateTokenResponse: {
UpdateTokenInProgress = false;
- DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session: token updated successfully";
+ DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully");
UpdateTokenIfNeededImpl();
break;
}
@@ -745,7 +761,7 @@ TWriteSession::TProcessSrvMessageResult TWriteSession::ProcessServerMessageImpl(
bool TWriteSession::CleanupOnAcknowledged(ui64 sequenceNumber) {
bool result = false;
- DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session: acknoledged message " << sequenceNumber;
+ DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber);
UpdateTimedCountersImpl();
const auto& sentFront = SentOriginalMessages.front();
ui64 size = 0;
@@ -797,10 +813,17 @@ TMemoryUsageChange TWriteSession::OnMemoryUsageChangedImpl(i64 diff) {
bool nowOk = MemoryUsage <= Settings.MaxMemoryUsage_;
if (wasOk != nowOk) {
if (wasOk) {
- DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Estimated memory usage " << MemoryUsage << "[B] reached maximum (" << Settings.MaxMemoryUsage_ << "[B])";
+ DbDriverState->Log.Write(
+ TLOG_DEBUG,
+ LogPrefix() << "Estimated memory usage " << MemoryUsage
+ << "[B] reached maximum (" << Settings.MaxMemoryUsage_ << "[B])"
+ );
}
else {
- DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]";
+ DbDriverState->Log.Write(
+ TLOG_DEBUG,
+ LogPrefix() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]"
+ );
}
}
return {wasOk, nowOk};
@@ -918,8 +941,11 @@ void TWriteSession::FlushWriteIfRequiredImpl() {
// Involves compression, but still called under lock.
size_t TWriteSession::WriteBatchImpl() {
- DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "write " << CurrentBatch.Messages.size() << " messages with seqNo from "
- << CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo;
+ DbDriverState->Log.Write(
+ TLOG_DEBUG,
+ LogPrefix() << "write " << CurrentBatch.Messages.size() << " messages with seqNo from "
+ << CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo
+ );
Y_VERIFY(CurrentBatch.Messages.size() <= MaxBlockMessageCount);
@@ -991,7 +1017,7 @@ bool TWriteSession::IsReadyToSendNextImpl() const {
void TWriteSession::UpdateTokenIfNeededImpl() {
- DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session: try to update token";
+ DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: try to update token");
if (!DbDriverState->CredentialsProvider || UpdateTokenInProgress)
return;
@@ -1004,7 +1030,7 @@ void TWriteSession::UpdateTokenIfNeededImpl() {
updateRequest->set_token(token);
PrevToken = token;
- DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session: updating token";
+ DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: updating token");
Processor->Write(std::move(clientMessage));
}
@@ -1053,9 +1079,12 @@ void TWriteSession::SendImpl() {
PackedMessagesToSend.pop();
}
UpdateTokenIfNeededImpl();
- DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Send " << writeRequest->sequence_numbers_size() << " message(s) ("
- << OriginalMessagesToSend.size() << " left), first sequence number is "
- << writeRequest->sequence_numbers(0);
+ DbDriverState->Log.Write(
+ TLOG_DEBUG,
+ LogPrefix() << "Send " << writeRequest->sequence_numbers_size() << " message(s) ("
+ << OriginalMessagesToSend.size() << " left), first sequence number is "
+ << writeRequest->sequence_numbers(0)
+ );
Processor->Write(std::move(clientMessage));
}
}
@@ -1064,7 +1093,10 @@ void TWriteSession::SendImpl() {
bool TWriteSession::Close(TDuration closeTimeout) {
if (AtomicGet(Aborting))
return false;
- DbDriverState->Log << TLOG_INFO << LogPrefix() << "Write session: close. Timeout = " << closeTimeout.MilliSeconds() << " ms";
+ DbDriverState->Log.Write(
+ TLOG_INFO,
+ LogPrefix() << "Write session: close. Timeout = " << closeTimeout.MilliSeconds() << " ms"
+ );
auto startTime = TInstant::Now();
auto remaining = closeTimeout;
bool ready = false;
@@ -1094,9 +1126,13 @@ bool TWriteSession::Close(TDuration closeTimeout) {
InitSeqNoPromise.SetException("session closed");
}
if (ready) {
- DbDriverState->Log << TLOG_INFO << LogPrefix() << "Write session: gracefully shut down, all writes complete";
+ DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete");
} else {
- DbDriverState->Log << TLOG_WARNING << LogPrefix() << "Write session: could not confirm all writes in time or session aborted, perform hard shutdown";
+ DbDriverState->Log.Write(
+ TLOG_WARNING,
+ LogPrefix() << "Write session: could not confirm all writes in time"
+ << " or session aborted, perform hard shutdown"
+ );
}
return ready;
}
@@ -1153,18 +1189,19 @@ void TWriteSession::UpdateTimedCountersImpl() {
<< Counters->counter->Val() \
/**/
- DbDriverState->Log << TLOG_INFO << LogPrefix()
+ DbDriverState->Log.Write(TLOG_INFO, LogPrefix()
<< "Counters: {"
- LOG_COUNTER(Errors)
- LOG_COUNTER(CurrentSessionLifetimeMs)
- LOG_COUNTER(BytesWritten)
- LOG_COUNTER(MessagesWritten)
- LOG_COUNTER(BytesWrittenCompressed)
- LOG_COUNTER(BytesInflightUncompressed)
- LOG_COUNTER(BytesInflightCompressed)
- LOG_COUNTER(BytesInflightTotal)
- LOG_COUNTER(MessagesInflight)
- << " }";
+ LOG_COUNTER(Errors)
+ LOG_COUNTER(CurrentSessionLifetimeMs)
+ LOG_COUNTER(BytesWritten)
+ LOG_COUNTER(MessagesWritten)
+ LOG_COUNTER(BytesWrittenCompressed)
+ LOG_COUNTER(BytesInflightUncompressed)
+ LOG_COUNTER(BytesInflightCompressed)
+ LOG_COUNTER(BytesInflightTotal)
+ LOG_COUNTER(MessagesInflight)
+ << " }"
+ );
#undef LOG_COUNTER
}
@@ -1172,7 +1209,7 @@ void TWriteSession::UpdateTimedCountersImpl() {
void TWriteSession::AbortImpl() {
if (!AtomicGet(Aborting)) {
- DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session: aborting";
+ DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: aborting");
AtomicSet(Aborting, 1);
Cancel(ConnectContext);
Cancel(ConnectTimeoutContext);
@@ -1184,7 +1221,7 @@ void TWriteSession::AbortImpl() {
}
void TWriteSession::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) {
- DbDriverState->Log << TLOG_INFO << LogPrefix() << "Write session will now close";
+ DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close");
EventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues)));
AbortImpl();
}
@@ -1196,13 +1233,13 @@ void TWriteSession::CloseImpl(EStatus statusCode, const TString& message) {
}
void TWriteSession::CloseImpl(TPlainStatus&& status) {
- DbDriverState->Log << TLOG_INFO << LogPrefix() << "Write session will now close";
+ DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close");
EventsQueue->Close(TSessionClosedEvent(std::move(status)));
AbortImpl();
}
TWriteSession::~TWriteSession() {
- DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session: destroy";
+ DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: destroy");
bool needClose = false;
with_lock(Lock) {
if (!AtomicGet(Aborting)) {
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h
index 6f506a0080..05824e111c 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h
@@ -314,7 +314,7 @@ public:
private:
- TString LogPrefix() const;
+ TStringBuilder LogPrefix() const;
void UpdateTokenIfNeededImpl();
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
index ee6a5fc64a..e0a6a4a0ca 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
@@ -1296,6 +1296,8 @@ struct TReadSessionSettings : public TRequestSettings<TReadSessionSettings> {
FLUENT_SETTING_VECTOR(TString, Clusters);
FLUENT_SETTING_DEFAULT(TDuration, ConnectTimeout, TDuration::Seconds(30));
+
+ FLUENT_SETTING_OPTIONAL(TLog, Log);
};
//! Simple write session. Does not need event handlers. Does not provide Events, ContinuationTokens, write Acks.
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
index 002dd47290..3ebfd50a65 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
@@ -619,6 +619,8 @@ TSingleClusterReadSessionImpl* TReadSessionImplTestSetup::GetSession() {
}
Session = std::make_shared<TSingleClusterReadSessionImpl>(
Settings,
+ "db",
+ "sessionid",
ClusterName,
Log,
MockProcessorFactory,