diff options
author | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-04-18 18:40:21 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-04-18 18:40:21 +0300 |
commit | 588fc4b8dc6cde4e910a512034844b3166341b6b (patch) | |
tree | 87ee5694964df100ffdb0eeea873b755026f7718 | |
parent | df455b910e1d049369547fe5c4e83417eb7a73a1 (diff) | |
download | ydb-588fc4b8dc6cde4e910a512034844b3166341b6b.tar.gz |
merge: enable logs in mirrorer and some fixes
do not override log formatter inside ydb_persqueue sdk
fix
fix
REVIEW: 2435689
more information from mirrorer & fix commit problem LOGBROKER-7400
1) исправлена проблема коммита офсета - коммит делается через оригинальное вычитанное сообщение
2) отслеживается самая старая фьюча по которой нет ответа
3) правильная обработка WaitNextReaderEventInFlight
4) постоянно запрашивается partition status
REVIEW: 2435480
enable sdk logs in mirrorer LOGBROKER-7400
enable logs
REVIEW: 2432795
REVIEW: 2438061
x-ydb-stable-ref: 1f12597d5956d27d42acad88a0e9d13e9e0b4bff
21 files changed, 402 insertions, 230 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 45dc30fa75..aef92dc043 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -1878,22 +1878,6 @@ void TPersQueueClusterTrackerInitializer::InitializeServices(NActors::TActorSyst TActorSetupCmd(actor, TMailboxType::HTSwap, appData->UserPoolId))); } -// TPersQueueLibSharedInstanceInitializer - -TPersQueueLibSharedInstanceInitializer::TPersQueueLibSharedInstanceInitializer(const TKikimrRunConfig& runConfig) - : IKikimrServicesInitializer(runConfig) -{} - -void TPersQueueLibSharedInstanceInitializer::InitializeServices(NActors::TActorSystemSetup*, const NKikimr::TAppData* appData) { - if (Config.HasPQConfig() && Config.GetPQConfig().GetEnabled()) { - if (Config.GetPQConfig().GetMirrorConfig().GetEnabled()) { - if (appData->PersQueueMirrorReaderFactory) { - appData->PersQueueMirrorReaderFactory->Initialize(Config.GetPQConfig().GetMirrorConfig().GetPQLibSettings()); - } - } - } -} - // TMemProfMonitorInitializer TMemProfMonitorInitializer::TMemProfMonitorInitializer(const TKikimrRunConfig& runConfig) diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h index 6265bcc03b..b3ac9b9cbe 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.h +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h @@ -353,13 +353,6 @@ public: void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; }; -class TPersQueueLibSharedInstanceInitializer : public IKikimrServicesInitializer { -public: - TPersQueueLibSharedInstanceInitializer(const TKikimrRunConfig& runConfig); - - void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; -}; - class TMemProfMonitorInitializer : public IKikimrServicesInitializer { public: TMemProfMonitorInitializer(const TKikimrRunConfig& runConfig); diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index c88e7b8fdc..7553c4cfe7 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1128,6 +1128,18 @@ void TKikimrRunner::InitializeActorSystem( if (YqSharedResources) { YqSharedResources->Init(ActorSystem.Get()); } + + if (runConfig.AppConfig.HasPQConfig()) { + const auto& pqConfig = runConfig.AppConfig.GetPQConfig(); + if (pqConfig.GetEnabled() && pqConfig.GetMirrorConfig().GetEnabled()) { + if (AppData->PersQueueMirrorReaderFactory) { + AppData->PersQueueMirrorReaderFactory->Initialize( + ActorSystem.Get(), + pqConfig.GetMirrorConfig().GetPQLibSettings() + ); + } + } + } } TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializersList( @@ -1263,8 +1275,6 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers sil->AddServiceInitializer(new TPersQueueClusterTrackerInitializer(runConfig)); } - sil->AddServiceInitializer(new TPersQueueLibSharedInstanceInitializer(runConfig)); - sil->AddServiceInitializer(new TMemProfMonitorInitializer(runConfig)); #if defined(ENABLE_MEMORY_TRACKING) diff --git a/ydb/core/persqueue/actor_persqueue_client_iface.h b/ydb/core/persqueue/actor_persqueue_client_iface.h index 2f8a8d4ab8..1bf0ed9a4c 100644 --- a/ydb/core/persqueue/actor_persqueue_client_iface.h +++ b/ydb/core/persqueue/actor_persqueue_client_iface.h @@ -1,6 +1,7 @@ #pragma once #include <ydb/core/protos/pqconfig.pb.h> +#include <ydb/library/logger/actor.h> #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> @@ -14,7 +15,21 @@ namespace NKikimr::NPQ { class IPersQueueMirrorReaderFactory { public: - virtual void Initialize(const NKikimrPQ::TPQConfig::TPQLibSettings& settings) const = 0; + IPersQueueMirrorReaderFactory() + : ActorSystemPtr(std::make_shared<TDeferredActorLogBackend::TAtomicActorSystemPtr>(nullptr)) + {} + + virtual void Initialize( + NActors::TActorSystem* actorSystem, + const NKikimrPQ::TPQConfig::TPQLibSettings& settings + ) const { + Y_VERIFY(!ActorSystemPtr->load(std::memory_order_relaxed), "Double init"); + ActorSystemPtr->store(actorSystem, std::memory_order_relaxed); + + auto driverConfig = NYdb::TDriverConfig() + .SetNetworkThreadsNum(settings.GetThreadsCount()); + Driver = std::make_shared<NYdb::TDriver>(driverConfig); + } virtual std::shared_ptr<NYdb::ICredentialsProviderFactory> GetCredentialsProvider( const NKikimrPQ::TMirrorPartitionConfig::TCredentials& cred @@ -24,20 +39,23 @@ public: const NKikimrPQ::TMirrorPartitionConfig& config, ui32 partition, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory, - ui64 maxMemoryUsageBytes + ui64 maxMemoryUsageBytes, + TMaybe<TLog> logger = Nothing() ) const = 0; virtual ~IPersQueueMirrorReaderFactory() = default; + + TDeferredActorLogBackend::TSharedAtomicActorSystemPtr GetSharedActorSystem() const { + return ActorSystemPtr; + } + +protected: + mutable TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr; + mutable std::shared_ptr<NYdb::TDriver> Driver; }; class TPersQueueMirrorReaderFactory : public IPersQueueMirrorReaderFactory { public: - void Initialize(const NKikimrPQ::TPQConfig::TPQLibSettings& settings) const override { - auto driverConfig = NYdb::TDriverConfig() - .SetNetworkThreadsNum(settings.GetThreadsCount()); - Driver = std::make_shared<NYdb::TDriver>(driverConfig); - } - std::shared_ptr<NYdb::ICredentialsProviderFactory> GetCredentialsProvider( const NKikimrPQ::TMirrorPartitionConfig::TCredentials& cred ) const override { @@ -55,7 +73,8 @@ public: const NKikimrPQ::TMirrorPartitionConfig& config, ui32 partition, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory, - ui64 maxMemoryUsageBytes + ui64 maxMemoryUsageBytes, + TMaybe<TLog> logger = Nothing() ) const override { NYdb::NPersQueue::TPersQueueClientSettings clientSettings = NYdb::NPersQueue::TPersQueueClientSettings() .DiscoveryEndpoint(TStringBuilder() << config.GetEndpoint() << ":" << config.GetEndpointPort()) @@ -72,6 +91,9 @@ public: .DisableClusterDiscovery(true) .ReadOnlyOriginal(true) .RetryPolicy(NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy()); + if (logger) { + settings.Log(logger.GetRef()); + } if (config.HasReadFromTimestampsMs()) { settings.StartingMessageTimestamp(TInstant::MilliSeconds(config.GetReadFromTimestampsMs())); } @@ -82,9 +104,6 @@ public: NYdb::NPersQueue::TPersQueueClient persQueueClient(*Driver, clientSettings); return persQueueClient.CreateReadSession(settings); } - -private: - mutable std::shared_ptr<NYdb::TDriver> Driver; }; } // namespace NKikimr::NSQS diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index ced72293ac..de9b74e75b 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -618,8 +618,10 @@ struct TEvPQ { }; struct TEvReaderEventArrived : public TEventLocal<TEvReaderEventArrived, EvReaderEventArrived> { - TEvReaderEventArrived() + TEvReaderEventArrived(ui64 id) : Id(id) {} + + ui64 Id; }; }; diff --git a/ydb/core/persqueue/mirrorer.cpp b/ydb/core/persqueue/mirrorer.cpp index 37aea95ad1..9873e3664a 100644 --- a/ydb/core/persqueue/mirrorer.cpp +++ b/ydb/core/persqueue/mirrorer.cpp @@ -167,11 +167,9 @@ void TMirrorer::ProcessWriteResponse( ) { Y_VERIFY_S(response.CmdWriteResultSize() == WriteInFlight.size(), MirrorerDescription() << "CmdWriteResultSize=" << response.CmdWriteResultSize() << ", but expected=" << WriteInFlight.size() - << ". First expected offset= " << (WriteInFlight.empty() ? -1 : WriteInFlight.front().Offset) + << ". First expected offset= " << (WriteInFlight.empty() ? -1 : WriteInFlight.front().GetOffset(0)) << " response: " << response); - - NYdb::NPersQueue::TDeferredCommit deferredCommit; for (auto& result : response.GetCmdWriteResult()) { if (result.GetAlreadyWritten()) { Y_VERIFY_S( @@ -185,22 +183,19 @@ void TMirrorer::ProcessWriteResponse( } auto& writtenMessageInfo = WriteInFlight.front(); if (MirrorerTimeLags) { - TDuration lag = TInstant::MilliSeconds(result.GetWriteTimestampMS()) - writtenMessageInfo.WriteTime; + TDuration lag = TInstant::MilliSeconds(result.GetWriteTimestampMS()) - writtenMessageInfo.GetWriteTime(0); MirrorerTimeLags->IncFor(lag.MilliSeconds(), 1); } - ui64 offset = writtenMessageInfo.Offset; + ui64 offset = writtenMessageInfo.GetOffset(0); Y_VERIFY((ui64)result.GetOffset() == offset); Y_VERIFY_S(EndOffset <= offset, MirrorerDescription() << "end offset more the written " << EndOffset << ">" << offset); EndOffset = offset + 1; - BytesInFlight -= writtenMessageInfo.Size; + BytesInFlight -= writtenMessageInfo.GetData().size(); - if (PartitionStream) { - deferredCommit.Add(PartitionStream, offset); - } + WriteInFlight.front().Commit(); WriteInFlight.pop_front(); } - deferredCommit.Commit(); AfterSuccesWrite(ctx); } @@ -266,6 +261,17 @@ void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorConte << "[STATE] bytes in flight " << BytesInFlight << ", messages in write request " << WriteInFlight.size() << ", queue to write: " << Queue.size()); + LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() + << "[STATE] read futures inflight " << ReadFuturesInFlight << ", last id=" << ReadFeatureId); + if (!ReadFeatures.empty()) { + const auto& oldest = *ReadFeatures.begin(); + const auto& info = oldest.second; + LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() + << "[STATE] The oldest read future id=" << oldest.first + << ", ts=" << info.first << " age=" << (ctx.Now() - info.first) + << ", future state: " << info.second.Initialized() + << "/" << info.second.HasValue() << "/" << info.second.HasException()); + } } if (!ReadSession && LastInitStageTimestamp + INIT_TIMEOUT < ctx.Now()) { ProcessError(ctx, TStringBuilder() << "read session was not created, the last stage of initialization was at " @@ -319,9 +325,7 @@ void TMirrorer::TryToWrite(const TActorContext& ctx) { req->SetCookie(WRITE_REQUEST_COOKIE); while (!Queue.empty() && AddToWriteRequest(*req, Queue.front())) { - auto& message = Queue.front(); - ui64 dataSize = message.GetData().size(); - WriteInFlight.emplace_back(message.GetOffset(0), dataSize, message.GetWriteTime(0)); + WriteInFlight.emplace_back(std::move(Queue.front())); Queue.pop_front(); } @@ -386,7 +390,17 @@ void TMirrorer::CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr&, const TActorCont auto factory = AppData(ctx)->PersQueueMirrorReaderFactory; Y_VERIFY(factory); - ReadSession = factory->GetReadSession(Config, Partition, CredentialsProvider, MAX_BYTES_IN_FLIGHT); + TLog log(MakeHolder<TDeferredActorLogBackend>( + factory->GetSharedActorSystem(), + NKikimrServices::PQ_MIRRORER + )); + + TString logPrefix = TStringBuilder() << MirrorerDescription() << "[reader " << ++ReaderGeneration << "] "; + log.SetFormatter([logPrefix](ELogPriority, TStringBuf message) -> TString { + return logPrefix + message; + }); + + ReadSession = factory->GetReadSession(Config, Partition, CredentialsProvider, MAX_BYTES_IN_FLIGHT, log); LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " read session created: " << ReadSession->GetSessionId()); @@ -400,13 +414,13 @@ void TMirrorer::RequestSourcePartitionStatus(TEvPQ::TEvRequestPartitionStatus::T } void TMirrorer::RequestSourcePartitionStatus() { - if (Config.GetSyncWriteTime() && PartitionStream && ReadSession) { + if (PartitionStream && ReadSession) { PartitionStream->RequestStatus(); } } void TMirrorer::TryUpdateWriteTimetsamp(const TActorContext &ctx) { - if (!WriteRequestInFlight && StreamStatus && EndOffset == StreamStatus->GetEndOffset()) { + if (Config.GetSyncWriteTime() && !WriteRequestInFlight && StreamStatus && EndOffset == StreamStatus->GetEndOffset()) { LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " update write timestamp from original topic: " << StreamStatus->DebugString()); THolder<TEvPersQueue::TEvRequest> request = MakeHolder<TEvPersQueue::TEvRequest>(); @@ -438,6 +452,10 @@ void TMirrorer::ScheduleConsumerCreation(const TActorContext& ctx) { LastInitStageTimestamp = ctx.Now(); ReadSession = nullptr; PartitionStream = nullptr; + ReadFuturesInFlight = 0; + ReadFeatures.clear(); + WaitNextReaderEventInFlight = false; + Become(&TThis::StateInitConsumer); LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " schedule consumer creation"); @@ -469,18 +487,33 @@ void TMirrorer::StartWaitNextReaderEvent(const TActorContext& ctx) { if (WaitNextReaderEventInFlight) { return; } + WaitNextReaderEventInFlight = true; + + ui64 futureId = ++ReadFeatureId; + ++ReadFuturesInFlight; auto future = ReadSession->WaitEvent(); + future.Subscribe( [ actorSystem = ctx.ExecutorThread.ActorSystem, - selfId = SelfId() + selfId = SelfId(), + futureId=futureId ](const NThreading::TFuture<void>&) { - actorSystem->Send(new NActors::IEventHandle(selfId, selfId, new TEvPQ::TEvReaderEventArrived())); + actorSystem->Send(new NActors::IEventHandle(selfId, selfId, new TEvPQ::TEvReaderEventArrived(futureId))); } ); + + if (ReadFeatures.size() < MAX_READ_FUTURES_STORE) { + ReadFeatures[futureId] = {ctx.Now(), future}; + } } -void TMirrorer::ProcessNextReaderEvent(TEvPQ::TEvReaderEventArrived::TPtr&, const TActorContext& ctx) { +void TMirrorer::ProcessNextReaderEvent(TEvPQ::TEvReaderEventArrived::TPtr& ev, const TActorContext& ctx) { + { + --ReadFuturesInFlight; + ReadFeatures.erase(ev->Get()->Id); + } + TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = ReadSession->GetEvent(false); WaitNextReaderEventInFlight = false; @@ -547,6 +580,10 @@ void TMirrorer::ProcessNextReaderEvent(TEvPQ::TEvReaderEventArrived::TPtr&, cons ProcessError(ctx, TStringBuilder() << " read session closed: " << closeSessionEvent->DebugString()); ScheduleConsumerCreation(ctx); return; + } else { + ProcessError(ctx, TStringBuilder() << " got unmatched event: " << event.GetRef().index()); + ScheduleConsumerCreation(ctx); + return; } Send(SelfId(), new TEvents::TEvWakeup()); diff --git a/ydb/core/persqueue/mirrorer.h b/ydb/core/persqueue/mirrorer.h index 6819891f6c..ff1fb9671b 100644 --- a/ydb/core/persqueue/mirrorer.h +++ b/ydb/core/persqueue/mirrorer.h @@ -19,6 +19,7 @@ namespace NPQ { class TMirrorer : public TActorBootstrapped<TMirrorer> { private: + const ui64 MAX_READ_FUTURES_STORE = 25; const ui64 MAX_BYTES_IN_FLIGHT = 8 * 1024 * 1024; const TDuration WRITE_RETRY_TIMEOUT_MAX = TDuration::Seconds(1); const TDuration WRITE_RETRY_TIMEOUT_START = TDuration::MilliSeconds(1); @@ -45,18 +46,6 @@ private: UPDATE_WRITE_TIMESTAMP = 2 }; - struct TMessageInfo { - ui64 Offset; - ui64 Size; - TInstant WriteTime; - - TMessageInfo(ui64 offset, ui64 size, TInstant writeTime) - : Offset(offset) - , Size(size) - , WriteTime(writeTime) - {} - }; - private: STFUNC(StateInitConsumer) @@ -167,13 +156,14 @@ private: NKikimrPQ::TMirrorPartitionConfig Config; TDeque<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage> Queue; - TDeque<TMessageInfo> WriteInFlight; + TDeque<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage> WriteInFlight; ui64 BytesInFlight = 0; std::optional<NKikimrClient::TPersQueuePartitionRequest> WriteRequestInFlight; TDuration WriteRetryTimeout = WRITE_RETRY_TIMEOUT_START; TInstant WriteRequestTimestamp; std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProvider; std::shared_ptr<NYdb::NPersQueue::IReadSession> ReadSession; + ui64 ReaderGeneration = 0; NYdb::NPersQueue::TPartitionStream::TPtr PartitionStream; THolder<NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamStatusEvent> StreamStatus; TInstant LastInitStageTimestamp; @@ -193,6 +183,10 @@ private: TMultiCounter InitTimeoutCounter; TMultiCounter WriteTimeoutCounter; THolder<TPercentileCounter> MirrorerTimeLags; + + TMap<ui64, std::pair<TInstant, NThreading::TFuture<void>>> ReadFeatures; + ui64 ReadFeatureId = 0; + ui64 ReadFuturesInFlight = 0; }; }// NPQ diff --git a/ydb/core/persqueue/mirrorer_ut.cpp b/ydb/core/persqueue/mirrorer_ut.cpp index c1114a9958..800a89c491 100644 --- a/ydb/core/persqueue/mirrorer_ut.cpp +++ b/ydb/core/persqueue/mirrorer_ut.cpp @@ -14,7 +14,7 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) { const auto& settings = server.CleverServer->GetRuntime()->GetAppData().PQConfig.GetMirrorConfig().GetPQLibSettings(); auto fabric = std::make_shared<NKikimr::NPQ::TPersQueueMirrorReaderFactory>(); - fabric->Initialize(settings); + fabric->Initialize(server.CleverServer->GetRuntime()->GetAnyNodeActorSystem(), settings); for (ui32 nodeId = 0; nodeId < server.CleverServer->GetRuntime()->GetNodeCount(); ++nodeId) { server.CleverServer->GetRuntime()->GetAppData(nodeId).PersQueueMirrorReaderFactory = fabric.get(); } diff --git a/ydb/core/persqueue/ya.make b/ydb/core/persqueue/ya.make index 50e89676ef..e8acc79a80 100644 --- a/ydb/core/persqueue/ya.make +++ b/ydb/core/persqueue/ya.make @@ -46,6 +46,7 @@ PEERDIR( ydb/core/persqueue/partition_key_range ydb/core/persqueue/writer ydb/core/protos + ydb/library/logger ydb/library/persqueue/counter_time_keeper ydb/library/persqueue/topic_parser ydb/public/lib/base diff --git a/ydb/core/yq/libs/shared_resources/shared_resources.cpp b/ydb/core/yq/libs/shared_resources/shared_resources.cpp index 3dfd566232..54f5488249 100644 --- a/ydb/core/yq/libs/shared_resources/shared_resources.cpp +++ b/ydb/core/yq/libs/shared_resources/shared_resources.cpp @@ -2,11 +2,9 @@ #include <ydb/core/protos/services.pb.h> #include <ydb/core/yq/libs/events/events.h> +#include <ydb/library/logger/actor.h> #include <library/cpp/actors/core/actorsystem.h> -#include <library/cpp/actors/core/log.h> -#include <library/cpp/logger/backend.h> -#include <library/cpp/logger/record.h> #include <util/generic/cast.h> #include <util/generic/strbuf.h> @@ -23,64 +21,8 @@ namespace NYq { namespace { -// Log backend that allows us to create shared YDB driver early (before actor system starts), -// but log to actor system. -class TDeferredActorSystemPtrInitActorLogBackend : public TLogBackend { -public: - using TAtomicActorSystemPtr = std::atomic<NActors::TActorSystem*>; - using TSharedAtomicActorSystemPtr = std::shared_ptr<TAtomicActorSystemPtr>; - - TDeferredActorSystemPtrInitActorLogBackend(TSharedAtomicActorSystemPtr actorSystem, int logComponent) - : ActorSystemPtr(std::move(actorSystem)) - , LogComponent(logComponent) - { - } - - NActors::NLog::EPriority GetActorLogPriority(ELogPriority priority) { - switch (priority) { - case TLOG_EMERG: - return NActors::NLog::PRI_EMERG; - case TLOG_ALERT: - return NActors::NLog::PRI_ALERT; - case TLOG_CRIT: - return NActors::NLog::PRI_CRIT; - case TLOG_ERR: - return NActors::NLog::PRI_ERROR; - case TLOG_WARNING: - return NActors::NLog::PRI_WARN; - case TLOG_NOTICE: - return NActors::NLog::PRI_NOTICE; - case TLOG_INFO: - return NActors::NLog::PRI_INFO; - case TLOG_DEBUG: - return NActors::NLog::PRI_DEBUG; - default: - return NActors::NLog::PRI_TRACE; - } - } - - void WriteData(const TLogRecord& rec) override { - NActors::TActorSystem* actorSystem = ActorSystemPtr->load(std::memory_order_relaxed); - if (Y_LIKELY(actorSystem)) { - LOG_LOG(*actorSystem, GetActorLogPriority(rec.Priority), LogComponent, TString(rec.Data, rec.Len)); - } else { - // Not inited. Temporary write to stderr. - TStringBuilder out; - out << TStringBuf(rec.Data, rec.Len) << Endl; - Cerr << out; - } - } - - void ReopenLog() override { - } - -protected: - TSharedAtomicActorSystemPtr ActorSystemPtr; - const int LogComponent; -}; - struct TActorSystemPtrMixin { - TDeferredActorSystemPtrInitActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr = std::make_shared<TDeferredActorSystemPtrInitActorLogBackend::TAtomicActorSystemPtr>(nullptr); + NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr = std::make_shared<NKikimr::TDeferredActorLogBackend::TAtomicActorSystemPtr>(nullptr); }; struct TYqSharedResourcesImpl : public TActorSystemPtrMixin, public TYqSharedResources { @@ -114,7 +56,7 @@ struct TYqSharedResourcesImpl : public TActorSystemPtrMixin, public TYqSharedRes cfg.SetGrpcMemoryQuota(config.GetGrpcMemoryQuota()); } cfg.SetDiscoveryMode(NYdb::EDiscoveryMode::Async); // We are in actor system! - cfg.SetLog(MakeHolder<TDeferredActorSystemPtrInitActorLogBackend>(ActorSystemPtr, NKikimrServices::EServiceKikimr::YDB_SDK)); + cfg.SetLog(MakeHolder<NKikimr::TDeferredActorLogBackend>(ActorSystemPtr, NKikimrServices::EServiceKikimr::YDB_SDK)); return cfg; } diff --git a/ydb/core/yq/libs/shared_resources/ya.make b/ydb/core/yq/libs/shared_resources/ya.make index 46c9a3bb2b..6b1eccd910 100644 --- a/ydb/core/yq/libs/shared_resources/ya.make +++ b/ydb/core/yq/libs/shared_resources/ya.make @@ -13,6 +13,7 @@ PEERDIR( ydb/core/protos ydb/core/yq/libs/events ydb/core/yq/libs/shared_resources/interface + ydb/library/logger ydb/library/security ydb/public/sdk/cpp/client/ydb_driver ydb/public/sdk/cpp/client/ydb_table diff --git a/ydb/library/logger/actor.cpp b/ydb/library/logger/actor.cpp new file mode 100644 index 0000000000..5c7f244f6e --- /dev/null +++ b/ydb/library/logger/actor.cpp @@ -0,0 +1,47 @@ +#include "actor.h" + +namespace NKikimr { + + TDeferredActorLogBackend::TDeferredActorLogBackend(TSharedAtomicActorSystemPtr actorSystem, int logComponent) + : ActorSystemPtr(std::move(actorSystem)) + , LogComponent(logComponent) + { + } + + NActors::NLog::EPriority TDeferredActorLogBackend::GetActorLogPriority(ELogPriority priority) const { + switch (priority) { + case TLOG_EMERG: + return NActors::NLog::PRI_EMERG; + case TLOG_ALERT: + return NActors::NLog::PRI_ALERT; + case TLOG_CRIT: + return NActors::NLog::PRI_CRIT; + case TLOG_ERR: + return NActors::NLog::PRI_ERROR; + case TLOG_WARNING: + return NActors::NLog::PRI_WARN; + case TLOG_NOTICE: + return NActors::NLog::PRI_NOTICE; + case TLOG_INFO: + return NActors::NLog::PRI_INFO; + case TLOG_DEBUG: + return NActors::NLog::PRI_DEBUG; + default: + return NActors::NLog::PRI_TRACE; + } + } + + void TDeferredActorLogBackend::WriteData(const TLogRecord& rec) { + NActors::TActorSystem* actorSystem = ActorSystemPtr->load(std::memory_order_relaxed); + if (Y_LIKELY(actorSystem)) { + LOG_LOG(*actorSystem, GetActorLogPriority(rec.Priority), LogComponent, TString(rec.Data, rec.Len)); + } else { + // Not inited. Temporary write to stderr. + TStringBuilder out; + out << TStringBuf(rec.Data, rec.Len) << Endl; + Cerr << out; + } + } + +} // NKikimr + diff --git a/ydb/library/logger/actor.h b/ydb/library/logger/actor.h new file mode 100644 index 0000000000..bccf2cbdfe --- /dev/null +++ b/ydb/library/logger/actor.h @@ -0,0 +1,32 @@ +#pragma once + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/log.h> +#include <library/cpp/logger/backend.h> +#include <library/cpp/logger/record.h> + + +namespace NKikimr { + +// Log backend that allows us to create shared YDB driver early (before actor system starts), +// but log to actor system. +class TDeferredActorLogBackend : public TLogBackend { +public: + using TAtomicActorSystemPtr = std::atomic<NActors::TActorSystem*>; + using TSharedAtomicActorSystemPtr = std::shared_ptr<TAtomicActorSystemPtr>; + + TDeferredActorLogBackend(TSharedAtomicActorSystemPtr actorSystem, int logComponent); + + NActors::NLog::EPriority GetActorLogPriority(ELogPriority priority) const; + + void WriteData(const TLogRecord& rec) override; + + void ReopenLog() override {} + +protected: + TSharedAtomicActorSystemPtr ActorSystemPtr; + const int LogComponent; +}; + +} // NKikimr + diff --git a/ydb/library/logger/ya.make b/ydb/library/logger/ya.make new file mode 100644 index 0000000000..4eb705eac0 --- /dev/null +++ b/ydb/library/logger/ya.make @@ -0,0 +1,14 @@ +LIBRARY()
+
+OWNER(g:kikimr)
+
+PEERDIR(
+ library/cpp/actors/core
+ library/cpp/logger
+)
+
+SRCS(
+ actor.cpp
+)
+
+END()
diff --git a/ydb/library/ya.make b/ydb/library/ya.make index 2fb80851bb..7cf5bf8d66 100644 --- a/ydb/library/ya.make +++ b/ydb/library/ya.make @@ -9,6 +9,7 @@ RECURSE( http_proxy keys login + logger mkql_proto naming_conventions pdisk_io diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp index 3b82d7ec0d..d44a85e527 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp @@ -51,13 +51,17 @@ private: std::weak_ptr<TReadSession> Session; }; +TStringBuilder TReadSession::GetLogPrefix() const { + return TStringBuilder() << GetDatabaseLogPrefix(DbDriverState->Database) << "[" << SessionId << "] "; +} + TReadSession::TReadSession(const TReadSessionSettings& settings, std::shared_ptr<TPersQueueClient::TImpl> client, std::shared_ptr<TGRpcConnectionsImpl> connections, TDbDriverStatePtr dbDriverState) : Settings(settings) , SessionId(CreateGuidAsString()) - , Log(dbDriverState->Log) + , Log(settings.Log_.GetOrElse(dbDriverState->Log)) , Client(std::move(client)) , Connections(std::move(connections)) , DbDriverState(std::move(dbDriverState)) @@ -67,12 +71,6 @@ TReadSession::TReadSession(const TReadSessionSettings& settings, } MakeCountersIfNeeded(); - - { - TStringBuilder logPrefix; - logPrefix << GetDatabaseLogPrefix(DbDriverState->Database) << "[" << SessionId << "] "; - Log.SetFormatter(GetPrefixLogFormatter(logPrefix)); - } } TReadSession::~TReadSession() { @@ -99,7 +97,7 @@ void TReadSession::Start() { return; } - Log << TLOG_INFO << "Starting read session"; + Log.Write(TLOG_INFO, GetLogPrefix() << "Starting read session"); if (Settings.DisableClusterDiscovery_) { ProceedWithoutClusterDiscovery(); } else { @@ -135,7 +133,7 @@ void TReadSession::StartClusterDiscovery() { return; } - Log << TLOG_DEBUG << "Starting cluster discovery"; + Log.Write(TLOG_DEBUG, GetLogPrefix() << "Starting cluster discovery"); ClusterDiscoveryDelayContext = nullptr; } @@ -196,22 +194,24 @@ void TReadSession::CreateClusterSessionsImpl() { if (sessionSettings.MaxMemoryUsageBytes_ > clusterSessionsCount && sessionSettings.MaxMemoryUsageBytes_ != std::numeric_limits<size_t>::max()) { sessionSettings.MaxMemoryUsageBytes_ /= clusterSessionsCount; } - Log << TLOG_DEBUG << "Starting session to cluster " << clusterName << " (" << clusterSessionInfo.ClusterEndpoint << ")"; + Log.Write( + TLOG_DEBUG, + GetLogPrefix() << "Starting session to cluster " << clusterName + << " (" << clusterSessionInfo.ClusterEndpoint << ")" + ); auto subclient = Client->GetClientForEndpoint(clusterSessionInfo.ClusterEndpoint); auto context = subclient->CreateContext(); if (!context) { AbortImpl(EStatus::ABORTED, DRIVER_IS_STOPPING_DESCRIPTION, deferred); return; } - TStringBuilder logPrefix; - logPrefix << GetDatabaseLogPrefix(DbDriverState->Database) << "[" << SessionId << "] [" << clusterName << "] "; - TLog log = Log; - log.SetFormatter(GetPrefixLogFormatter(logPrefix)); clusterSessionInfo.Session = std::make_shared<TSingleClusterReadSessionImpl>( sessionSettings, + DbDriverState->Database, + SessionId, clusterName, - log, + Log, subclient->CreateReadSessionConnectionProcessorFactory(), EventsQueue, ErrorHandler, @@ -235,8 +235,11 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu } TMaybe<TDuration> retryDelay = ClusterDiscoveryRetryState->GetNextRetryDelay(status.GetStatus()); if (retryDelay) { - Log << TLOG_INFO << "Cluster discovery request failed. Status: " << status.GetStatus() - << ". Issues: \"" << IssuesSingleLineString(status.GetIssues()) << "\""; + Log.Write( + TLOG_INFO, + GetLogPrefix() << "Cluster discovery request failed. Status: " << status.GetStatus() + << ". Issues: \"" << IssuesSingleLineString(status.GetIssues()) << "\"" + ); RestartClusterDiscoveryImpl(*retryDelay, deferred); } else { AbortImpl(status.GetStatus(), MakeIssueWithSubIssues("Failed to discover clusters", status.GetIssues()), deferred); @@ -244,7 +247,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu return; } - Log << TLOG_DEBUG << "Cluster discovery request succeeded"; + Log.Write(TLOG_DEBUG, GetLogPrefix() << "Cluster discovery request succeeded"); ClusterDiscoveryRetryState = nullptr; // Init ClusterSessions. @@ -319,7 +322,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu } void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions& deferred) { - Log << TLOG_DEBUG << "Restart cluster discovery in " << delay; + Log.Write(TLOG_DEBUG, GetLogPrefix() << "Restart cluster discovery in " << delay); auto startCallback = [self = weak_from_this()](bool ok) { if (ok) { if (auto sharedSelf = self.lock()) { @@ -339,7 +342,7 @@ void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions } bool TReadSession::Close(TDuration timeout) { - Log << TLOG_INFO << "Closing read session. Close timeout: " << timeout; + Log.Write(TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout); with_lock (Lock) { Cancel(ClusterDiscoveryDelayContext); Cancel(DumpCountersContext); @@ -424,7 +427,7 @@ bool TReadSession::Close(TDuration timeout) { void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions& deferred) { if (!Aborting) { Aborting = true; - Log << TLOG_NOTICE << "Aborting read session. Description: " << closeEvent.DebugString(); + Log.Write(TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString()); Cancel(ClusterDiscoveryDelayContext); Cancel(DumpCountersContext); for (auto& [cluster, sessionInfo] : ClusterSessions) { @@ -488,7 +491,7 @@ TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(bool block, size_t maxB } void TReadSession::StopReadingData() { - Log << TLOG_INFO << "Stop reading data"; + Log.Write(TLOG_INFO, GetLogPrefix() << "Stop reading data"); with_lock (Lock) { if (!DataReadingSuspended) { DataReadingSuspended = true; @@ -503,7 +506,7 @@ void TReadSession::StopReadingData() { } void TReadSession::ResumeReadingData() { - Log << TLOG_INFO << "Resume reading data"; + Log.Write(TLOG_INFO, GetLogPrefix() << "Resume reading data"); with_lock (Lock) { if (DataReadingSuspended) { DataReadingSuspended = false; @@ -530,7 +533,7 @@ static ELogPriority GetEventLogPriority(const TReadSessionEvent::TEvent& event) } void TReadSession::OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) { - Log << GetEventLogPriority(event) << "Read session event " << DebugString(event); + Log.Write(GetEventLogPriority(event), GetLogPrefix() << "Read session event " << DebugString(event)); } void TReadSession::MakeCountersIfNeeded() { @@ -583,8 +586,8 @@ void TReadSession::DumpCountersToLog(size_t timeNumber) { /**/ if (logCounters) { - Log << TLOG_INFO - << "Counters: {" + Log.Write(TLOG_INFO, + GetLogPrefix() << "Counters: {" C(Errors) C(CurrentSessionLifetimeMs) C(BytesRead) @@ -594,7 +597,8 @@ void TReadSession::DumpCountersToLog(size_t timeNumber) { C(BytesInflightCompressed) C(BytesInflightTotal) C(MessagesInflight) - << " }"; + << " }" + ); } #undef C @@ -686,6 +690,10 @@ void TPartitionStreamImpl::SignalReadyEvents(TReadSessionEventsQueue* queue, TDe } } +TStringBuilder TSingleClusterReadSessionImpl::GetLogPrefix() const { + return TStringBuilder() << GetDatabaseLogPrefix(Database) << "[" << SessionId << "] [" << ClusterName << "] "; +} + void TSingleClusterReadSessionImpl::Start() { Settings.DecompressionExecutor_->Start(); Settings.EventHandlers_.HandlersExecutor_->Start(); @@ -709,7 +717,11 @@ bool TSingleClusterReadSessionImpl::Reconnect(const TPlainStatus& status) { NGrpc::IQueueClientContextPtr prevConnectDelayContext; if (!status.Ok()) { - Log << TLOG_INFO << "Got error. Status: " << status.Status << ". Description: " << IssuesSingleLineString(status.Issues); + Log.Write( + TLOG_INFO, + GetLogPrefix() << "Got error. Status: " << status.Status + << ". Description: " << IssuesSingleLineString(status.Issues) + ); } TDeferredActions deferred; @@ -731,7 +743,10 @@ bool TSingleClusterReadSessionImpl::Reconnect(const TPlainStatus& status) { if (!delayContext) { return false; } - Log << TLOG_DEBUG << "Reconnecting session to cluster " << ClusterName << " in "<< delay; + Log.Write( + TLOG_DEBUG, + GetLogPrefix() << "Reconnecting session to cluster " << ClusterName << " in "<< delay + ); } else { return false; } @@ -788,7 +803,11 @@ bool TSingleClusterReadSessionImpl::Reconnect(const TPlainStatus& status) { } void TSingleClusterReadSessionImpl::BreakConnectionAndReconnectImpl(TPlainStatus&& status, TDeferredActions& deferred) { - Log << TLOG_INFO << "Break connection due to unexpected message from server. Status: " << status.Status << ", Issues: \"" << IssuesSingleLineString(status.Issues) << "\""; + Log.Write( + TLOG_INFO, + GetLogPrefix() << "Break connection due to unexpected message from server. Status: " << status.Status + << ", Issues: \"" << IssuesSingleLineString(status.Issues) << "\"" + ); Processor->Cancel(); Processor = nullptr; @@ -861,7 +880,7 @@ void TSingleClusterReadSessionImpl::OnConnect(TPlainStatus&& st, typename IProce } void TSingleClusterReadSessionImpl::InitImpl(TDeferredActions& deferred) { // Assumes that we're under lock. - Log << TLOG_DEBUG << "Successfully connected. Initializing session"; + Log.Write(TLOG_DEBUG, GetLogPrefix() << "Successfully connected. Initializing session"); Ydb::PersQueue::V1::MigrationStreamingReadClientMessage req; auto& init = *req.mutable_init_request(); init.set_ranges_mode(RangesMode); @@ -917,14 +936,21 @@ void TSingleClusterReadSessionImpl::ConfirmPartitionStreamCreate(const TPartitio if (commitOffset) { commitOffsetLogStr << ". Commit offset: " << *commitOffset; } - Log << TLOG_INFO << "Confirm partition stream create. Partition stream id: " << partitionStream->GetPartitionStreamId() - << ". Cluster: \"" << partitionStream->GetCluster() << "\". Topic: \"" << partitionStream->GetTopicPath() - << "\". Partition: " << partitionStream->GetPartitionId() - << ". Read offset: " << readOffset << commitOffsetLogStr; + Log.Write( + TLOG_INFO, + GetLogPrefix() << "Confirm partition stream create. Partition stream id: " << partitionStream->GetPartitionStreamId() + << ". Cluster: \"" << partitionStream->GetCluster() << "\". Topic: \"" << partitionStream->GetTopicPath() + << "\". Partition: " << partitionStream->GetPartitionId() + << ". Read offset: " << readOffset << commitOffsetLogStr + ); with_lock (Lock) { if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation. - Log << TLOG_DEBUG << "Skip partition stream create confirm. Partition stream id: " << partitionStream->GetPartitionStreamId(); + Log.Write( + TLOG_DEBUG, + GetLogPrefix() << "Skip partition stream create confirm. Partition stream id: " + << partitionStream->GetPartitionStreamId() + ); return; } @@ -946,14 +972,22 @@ void TSingleClusterReadSessionImpl::ConfirmPartitionStreamCreate(const TPartitio } void TSingleClusterReadSessionImpl::ConfirmPartitionStreamDestroy(TPartitionStreamImpl* partitionStream) { - Log << TLOG_INFO << "Confirm partition stream destroy. Partition stream id: " << partitionStream->GetPartitionStreamId() - << ". Cluster: \"" << partitionStream->GetCluster() << "\". Topic: \"" << partitionStream->GetTopicPath() - << "\". Partition: " << partitionStream->GetPartitionId(); + Log.Write( + TLOG_INFO, + GetLogPrefix() << "Confirm partition stream destroy. Partition stream id: " + << partitionStream->GetPartitionStreamId() + << ". Cluster: \"" << partitionStream->GetCluster() << "\". Topic: \"" << partitionStream->GetTopicPath() + << "\". Partition: " << partitionStream->GetPartitionId() + ); TDeferredActions deferred; with_lock (Lock) { if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation. - Log << TLOG_DEBUG << "Skip partition stream destroy confirm. Partition stream id: " << partitionStream->GetPartitionStreamId(); + Log.Write( + TLOG_DEBUG, + GetLogPrefix() << "Skip partition stream destroy confirm. Partition stream id: " + << partitionStream->GetPartitionStreamId() + ); return; } @@ -973,7 +1007,11 @@ void TSingleClusterReadSessionImpl::ConfirmPartitionStreamDestroy(TPartitionStre } void TSingleClusterReadSessionImpl::Commit(const TPartitionStreamImpl* partitionStream, ui64 startOffset, ui64 endOffset) { - Log << TLOG_DEBUG << "Commit offsets [" << startOffset << ", " << endOffset << "). Partition stream id: " << partitionStream->GetPartitionStreamId(); + Log.Write( + TLOG_DEBUG, + GetLogPrefix() << "Commit offsets [" << startOffset << ", " << endOffset + << "). Partition stream id: " << partitionStream->GetPartitionStreamId() + ); with_lock (Lock) { if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation. return; @@ -1004,7 +1042,10 @@ void TSingleClusterReadSessionImpl::Commit(const TPartitionStreamImpl* partition } void TSingleClusterReadSessionImpl::RequestPartitionStreamStatus(const TPartitionStreamImpl* partitionStream) { - Log << TLOG_DEBUG << "Requesting status for partition stream id: " << partitionStream->GetPartitionStreamId(); + Log.Write( + TLOG_DEBUG, + GetLogPrefix() << "Requesting status for partition stream id: " << partitionStream->GetPartitionStreamId() + ); with_lock (Lock) { if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation. return; @@ -1022,7 +1063,7 @@ void TSingleClusterReadSessionImpl::RequestPartitionStreamStatus(const TPartitio } void TSingleClusterReadSessionImpl::OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) { - Log << TLOG_DEBUG << "Read session event " << DebugString(event); + Log.Write(TLOG_DEBUG, GetLogPrefix() << "Read session event " << DebugString(event)); const i64 bytesCount = static_cast<i64>(CalcDataSize(event)); Y_ASSERT(bytesCount >= 0); @@ -1142,7 +1183,7 @@ void TSingleClusterReadSessionImpl::OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, void TSingleClusterReadSessionImpl::OnReadDoneImpl(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::InitResponse&& msg, TDeferredActions& deferred) { // Assumes that we're under lock. Y_UNUSED(deferred); - Log << TLOG_INFO << "Server session id: " << msg.session_id(); + Log.Write(TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id()); // Successful init. Do nothing. ContinueReadingDataImpl(); @@ -1266,7 +1307,7 @@ void TSingleClusterReadSessionImpl::OnReadDoneImpl(Ydb::PersQueue::V1::Migration void TSingleClusterReadSessionImpl::OnReadDoneImpl(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::Committed&& msg, TDeferredActions& deferred) { // Assumes that we're under lock. - Log << TLOG_DEBUG << "Committed response: " << msg; + Log.Write(TLOG_DEBUG, GetLogPrefix() << "Committed response: " << msg); TMap<ui64, TIntrusivePtr<TPartitionStreamImpl>> partitionStreams; for (const Ydb::PersQueue::V1::CommitCookie& cookieProto : msg.cookies()) { @@ -1367,7 +1408,7 @@ void TSingleClusterReadSessionImpl::OnDataDecompressed(i64 sourceSize, i64 estim } void TSingleClusterReadSessionImpl::Abort() { - Log << TLOG_DEBUG << "Abort session to cluster"; + Log.Write(TLOG_DEBUG, GetLogPrefix() << "Abort session to cluster"); with_lock (Lock) { if (!Aborting) { @@ -2097,10 +2138,10 @@ bool TDataDecompressionInfo::TakeData(const TIntrusivePtr<TPartitionStreamImpl>& } while (CurrentReadingMessage.first < static_cast<size_t>(msg.batches_size()) && msg.batches(CurrentReadingMessage.first).message_data_size() == 0); } } - partitionStream->GetLog() << TLOG_DEBUG << "Take Data. Partition " << partitionStream->GetPartitionId() + partitionStream->GetLog().Write(TLOG_DEBUG, TStringBuilder() << "Take Data. Partition " << partitionStream->GetPartitionId() << ". Read: {" << prevReadingMessage.first << ", " << prevReadingMessage.second << "} -> {" << CurrentReadingMessage.first << ", " << CurrentReadingMessage.second << "} (" - << minOffset << "-" << maxOffset << ")"; + << minOffset << "-" << maxOffset << ")"); return CurrentReadingMessage <= *readyThreshold; } @@ -2175,7 +2216,10 @@ void TDataDecompressionInfo::TDecompressionTask::operator()() { } } if (auto session = Parent->Session.lock()) { - session->GetLog() << TLOG_DEBUG << "Decompression task done. Partition: " << partition << " (" << minOffset << "-" << maxOffset << ")"; + session->GetLog().Write( + TLOG_DEBUG, + TStringBuilder() << "Decompression task done. Partition: " << partition << " (" << minOffset << "-" << maxOffset << ")" + ); } Y_ASSERT(dataProcessed == SourceDataSize); std::shared_ptr<TSingleClusterReadSessionImpl> session = Parent->Session.lock(); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index e08db09456..5a62afd7e3 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -759,6 +759,8 @@ public: TSingleClusterReadSessionImpl( const TReadSessionSettings& settings, + const TString& database, + const TString& sessionId, const TString& clusterName, const TLog& log, std::shared_ptr<IReadSessionConnectionProcessorFactory> connectionFactory, @@ -768,6 +770,8 @@ public: ui64 partitionStreamIdStart, ui64 partitionStreamIdStep ) : Settings(settings) + , Database(database) + , SessionId(sessionId) , ClusterName(clusterName) , Log(log) , NextPartitionStreamId(partitionStreamIdStart) @@ -808,6 +812,8 @@ public: void DumpStatisticsToLog(TLogElement& log); void UpdateMemoryUsageStatistics(); + TStringBuilder GetLogPrefix() const; + const TLog& GetLog() const { return Log; } @@ -956,6 +962,8 @@ private: private: const TReadSessionSettings Settings; + const TString Database; + const TString SessionId; const TString ClusterName; TLog Log; ui64 NextPartitionStreamId; @@ -1064,6 +1072,8 @@ public: void ClearAllEvents(); private: + TStringBuilder GetLogPrefix() const; + // Start bool ValidateSettings(); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp index 0a338cfc1d..1259126b66 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp @@ -85,10 +85,14 @@ void TWriteSession::Start(const TDuration& delay) { TWriteSession::THandleResult TWriteSession::RestartImpl(const TPlainStatus& status) { THandleResult result; if (AtomicGet(Aborting)) { - DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session is aborting and will not restart"; + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart"); return result; } - DbDriverState->Log << TLOG_INFO << LogPrefix() << "Got error. Status: " << status.Status << ". Description: " << IssuesSingleLineString(status.Issues); + DbDriverState->Log.Write( + TLOG_INFO, + LogPrefix() << "Got error. Status: " << status.Status + << ". Description: " << IssuesSingleLineString(status.Issues) + ); SessionEstablished = false; TMaybe<TDuration> nextDelay = TDuration::Zero(); if (!RetryState) { @@ -99,11 +103,14 @@ TWriteSession::THandleResult TWriteSession::RestartImpl(const TPlainStatus& stat if (nextDelay) { result.StartDelay = *nextDelay; result.DoRestart = true; - DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session will restart in " << result.StartDelay.MilliSeconds() << " ms"; + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Write session will restart in " << result.StartDelay.MilliSeconds() << " ms" + ); ResetForRetryImpl(); } else { - DbDriverState->Log << TLOG_INFO << LogPrefix() << "Write session will not restart after a fatal error"; + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error"); result.DoStop = true; CheckHandleResultImpl(result); } @@ -116,7 +123,7 @@ bool IsFederation(const TString& endpoint) { } void TWriteSession::DoCdsRequest(TDuration delay) { - DbDriverState->Log << TLOG_INFO << LogPrefix() << "Write session: Do CDS request"; + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: Do CDS request"); auto weakThis = weak_from_this(); if ( @@ -149,10 +156,10 @@ void TWriteSession::DoCdsRequest(TDuration delay) { params->set_preferred_cluster_name(*Settings.PreferredCluster_); auto weakConnections = std::weak_ptr<TGRpcConnectionsImpl>(Connections); - DbDriverState->Log << TLOG_INFO << LogPrefix() << "Do schedule cds request after " << delay.MilliSeconds() << " ms\n"; + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Do schedule cds request after " << delay.MilliSeconds() << " ms\n"); auto cdsRequestCall = [req_=std::move(req), extr=std::move(extractor), weakConnections, dbState=DbDriverState, settings=Settings]() mutable { if (auto connections = weakConnections.lock()) { - dbState->Log << TLOG_INFO << "MessageGroupId [" << settings.MessageGroupId_ << "] Running cds request ms\n"; + dbState->Log.Write(TLOG_INFO, TStringBuilder() << "MessageGroupId [" << settings.MessageGroupId_ << "] Running cds request ms\n"); connections->RunDeferred<Ydb::PersQueue::V1::ClusterDiscoveryService, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse>( @@ -171,7 +178,7 @@ void TWriteSession::DoCdsRequest(TDuration delay) { void TWriteSession::OnCdsResponse( TStatus& status, const Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult& result ) { - DbDriverState->Log << TLOG_INFO << LogPrefix() << "Got CDS response: \n" << result.ShortDebugString(); + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Got CDS response: \n" << result.ShortDebugString()); TString endpoint, name; THandleResult handleResult; if (!status.IsSuccess()) { @@ -266,7 +273,7 @@ void TWriteSession::InitWriter() { // No Lock, very initial start - no race yet NThreading::TFuture<ui64> TWriteSession::GetInitSeqNo() { if (Settings.ValidateSeqNo_) { if (AutoSeqNoMode.Defined() && *AutoSeqNoMode) { - DbDriverState->Log << TLOG_ERR << LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode"; + DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode"); ThrowFatalError("Cannot call GetInitSeqNo in Auto SeqNo mode"); } else @@ -302,17 +309,23 @@ ui64 TWriteSession::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) { } if (seqNo.Defined()) { if (*AutoSeqNoMode) { - DbDriverState->Log << TLOG_ERR << LogPrefix() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode"; + DbDriverState->Log.Write( + TLOG_ERR, + LogPrefix() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" + ); ThrowFatalError( - "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" + "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" ); } else { seqNoValue = *seqNo; } } else if (!(*AutoSeqNoMode)) { - DbDriverState->Log << TLOG_ERR << LogPrefix() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode"; + DbDriverState->Log.Write( + TLOG_ERR, + LogPrefix() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" + ); ThrowFatalError( - "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" + "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" ); } LastSeqNo = seqNoValue; @@ -378,7 +391,7 @@ TWriteSession::THandleResult TWriteSession::OnErrorImpl(NYdb::TPlainStatus&& sta // No lock void TWriteSession::DoConnect(const TDuration& delay, const TString& endpoint) { - DbDriverState->Log << TLOG_INFO << LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint; + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint); NGrpc::IQueueClientContextPtr prevConnectContext; NGrpc::IQueueClientContextPtr prevConnectTimeoutContext; @@ -457,7 +470,7 @@ void TWriteSession::DoConnect(const TDuration& delay, const TString& endpoint) { // RPC callback. void TWriteSession::OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext) { - DbDriverState->Log << TLOG_ERR << LogPrefix() << "Write session: connect timeout"; + DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Write session: connect timeout"); THandleResult handleResult; with_lock (Lock) { if (ConnectTimeoutContext == connectTimeoutContext) { @@ -536,7 +549,7 @@ void TWriteSession::InitImpl() { for (const auto& attr : Settings.Meta_.Fields) { (*init->mutable_session_meta())[attr.first] = attr.second; } - DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session: send init request: "<< req.ShortDebugString(); + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString()); WriteToProcessorImpl(std::move(req)); } @@ -635,7 +648,7 @@ void TWriteSession::OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectio ProcessHandleResult(processResult.HandleResult); } -TString TWriteSession::LogPrefix() const { +TStringBuilder TWriteSession::LogPrefix() const { return TStringBuilder() << "MessageGroupId [" << Settings.MessageGroupId_ << "] SessionId [" << SessionId << "] "; } @@ -676,7 +689,7 @@ TWriteSession::TProcessSrvMessageResult TWriteSession::ProcessServerMessageImpl( } case TServerMessage::kInitResponse: { const auto& initResponse = ServerMessage->init_response(); - DbDriverState->Log << TLOG_INFO << LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString(); + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString()); SessionId = initResponse.session_id(); PartitionId = initResponse.partition_id(); ui64 newLastSeqNo = initResponse.last_sequence_number(); @@ -702,7 +715,10 @@ TWriteSession::TProcessSrvMessageResult TWriteSession::ProcessServerMessageImpl( case TServerMessage::kBatchWriteResponse: { TWriteSessionEvent::TAcksEvent acksEvent; const auto& batchWriteResponse = ServerMessage->batch_write_response(); - DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session got write response: " << batchWriteResponse.ShortDebugString(); + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Write session got write response: " << batchWriteResponse.ShortDebugString() + ); TWriteStat::TPtr writeStat = new TWriteStat{}; const auto& stat = batchWriteResponse.write_statistics(); writeStat->WriteTime = TDuration::MilliSeconds(stat.persist_duration_ms()); @@ -735,7 +751,7 @@ TWriteSession::TProcessSrvMessageResult TWriteSession::ProcessServerMessageImpl( } case TServerMessage::kUpdateTokenResponse: { UpdateTokenInProgress = false; - DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session: token updated successfully"; + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully"); UpdateTokenIfNeededImpl(); break; } @@ -745,7 +761,7 @@ TWriteSession::TProcessSrvMessageResult TWriteSession::ProcessServerMessageImpl( bool TWriteSession::CleanupOnAcknowledged(ui64 sequenceNumber) { bool result = false; - DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session: acknoledged message " << sequenceNumber; + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber); UpdateTimedCountersImpl(); const auto& sentFront = SentOriginalMessages.front(); ui64 size = 0; @@ -797,10 +813,17 @@ TMemoryUsageChange TWriteSession::OnMemoryUsageChangedImpl(i64 diff) { bool nowOk = MemoryUsage <= Settings.MaxMemoryUsage_; if (wasOk != nowOk) { if (wasOk) { - DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Estimated memory usage " << MemoryUsage << "[B] reached maximum (" << Settings.MaxMemoryUsage_ << "[B])"; + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Estimated memory usage " << MemoryUsage + << "[B] reached maximum (" << Settings.MaxMemoryUsage_ << "[B])" + ); } else { - DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]"; + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]" + ); } } return {wasOk, nowOk}; @@ -918,8 +941,11 @@ void TWriteSession::FlushWriteIfRequiredImpl() { // Involves compression, but still called under lock. size_t TWriteSession::WriteBatchImpl() { - DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "write " << CurrentBatch.Messages.size() << " messages with seqNo from " - << CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo; + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "write " << CurrentBatch.Messages.size() << " messages with seqNo from " + << CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo + ); Y_VERIFY(CurrentBatch.Messages.size() <= MaxBlockMessageCount); @@ -991,7 +1017,7 @@ bool TWriteSession::IsReadyToSendNextImpl() const { void TWriteSession::UpdateTokenIfNeededImpl() { - DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session: try to update token"; + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: try to update token"); if (!DbDriverState->CredentialsProvider || UpdateTokenInProgress) return; @@ -1004,7 +1030,7 @@ void TWriteSession::UpdateTokenIfNeededImpl() { updateRequest->set_token(token); PrevToken = token; - DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session: updating token"; + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: updating token"); Processor->Write(std::move(clientMessage)); } @@ -1053,9 +1079,12 @@ void TWriteSession::SendImpl() { PackedMessagesToSend.pop(); } UpdateTokenIfNeededImpl(); - DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Send " << writeRequest->sequence_numbers_size() << " message(s) (" - << OriginalMessagesToSend.size() << " left), first sequence number is " - << writeRequest->sequence_numbers(0); + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Send " << writeRequest->sequence_numbers_size() << " message(s) (" + << OriginalMessagesToSend.size() << " left), first sequence number is " + << writeRequest->sequence_numbers(0) + ); Processor->Write(std::move(clientMessage)); } } @@ -1064,7 +1093,10 @@ void TWriteSession::SendImpl() { bool TWriteSession::Close(TDuration closeTimeout) { if (AtomicGet(Aborting)) return false; - DbDriverState->Log << TLOG_INFO << LogPrefix() << "Write session: close. Timeout = " << closeTimeout.MilliSeconds() << " ms"; + DbDriverState->Log.Write( + TLOG_INFO, + LogPrefix() << "Write session: close. Timeout = " << closeTimeout.MilliSeconds() << " ms" + ); auto startTime = TInstant::Now(); auto remaining = closeTimeout; bool ready = false; @@ -1094,9 +1126,13 @@ bool TWriteSession::Close(TDuration closeTimeout) { InitSeqNoPromise.SetException("session closed"); } if (ready) { - DbDriverState->Log << TLOG_INFO << LogPrefix() << "Write session: gracefully shut down, all writes complete"; + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete"); } else { - DbDriverState->Log << TLOG_WARNING << LogPrefix() << "Write session: could not confirm all writes in time or session aborted, perform hard shutdown"; + DbDriverState->Log.Write( + TLOG_WARNING, + LogPrefix() << "Write session: could not confirm all writes in time" + << " or session aborted, perform hard shutdown" + ); } return ready; } @@ -1153,18 +1189,19 @@ void TWriteSession::UpdateTimedCountersImpl() { << Counters->counter->Val() \ /**/ - DbDriverState->Log << TLOG_INFO << LogPrefix() + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Counters: {" - LOG_COUNTER(Errors) - LOG_COUNTER(CurrentSessionLifetimeMs) - LOG_COUNTER(BytesWritten) - LOG_COUNTER(MessagesWritten) - LOG_COUNTER(BytesWrittenCompressed) - LOG_COUNTER(BytesInflightUncompressed) - LOG_COUNTER(BytesInflightCompressed) - LOG_COUNTER(BytesInflightTotal) - LOG_COUNTER(MessagesInflight) - << " }"; + LOG_COUNTER(Errors) + LOG_COUNTER(CurrentSessionLifetimeMs) + LOG_COUNTER(BytesWritten) + LOG_COUNTER(MessagesWritten) + LOG_COUNTER(BytesWrittenCompressed) + LOG_COUNTER(BytesInflightUncompressed) + LOG_COUNTER(BytesInflightCompressed) + LOG_COUNTER(BytesInflightTotal) + LOG_COUNTER(MessagesInflight) + << " }" + ); #undef LOG_COUNTER } @@ -1172,7 +1209,7 @@ void TWriteSession::UpdateTimedCountersImpl() { void TWriteSession::AbortImpl() { if (!AtomicGet(Aborting)) { - DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session: aborting"; + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: aborting"); AtomicSet(Aborting, 1); Cancel(ConnectContext); Cancel(ConnectTimeoutContext); @@ -1184,7 +1221,7 @@ void TWriteSession::AbortImpl() { } void TWriteSession::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) { - DbDriverState->Log << TLOG_INFO << LogPrefix() << "Write session will now close"; + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close"); EventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues))); AbortImpl(); } @@ -1196,13 +1233,13 @@ void TWriteSession::CloseImpl(EStatus statusCode, const TString& message) { } void TWriteSession::CloseImpl(TPlainStatus&& status) { - DbDriverState->Log << TLOG_INFO << LogPrefix() << "Write session will now close"; + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close"); EventsQueue->Close(TSessionClosedEvent(std::move(status))); AbortImpl(); } TWriteSession::~TWriteSession() { - DbDriverState->Log << TLOG_DEBUG << LogPrefix() << "Write session: destroy"; + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: destroy"); bool needClose = false; with_lock(Lock) { if (!AtomicGet(Aborting)) { diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h index 6f506a0080..05824e111c 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h @@ -314,7 +314,7 @@ public: private: - TString LogPrefix() const; + TStringBuilder LogPrefix() const; void UpdateTokenIfNeededImpl(); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h index ee6a5fc64a..e0a6a4a0ca 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h @@ -1296,6 +1296,8 @@ struct TReadSessionSettings : public TRequestSettings<TReadSessionSettings> { FLUENT_SETTING_VECTOR(TString, Clusters); FLUENT_SETTING_DEFAULT(TDuration, ConnectTimeout, TDuration::Seconds(30)); + + FLUENT_SETTING_OPTIONAL(TLog, Log); }; //! Simple write session. Does not need event handlers. Does not provide Events, ContinuationTokens, write Acks. diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp index 002dd47290..3ebfd50a65 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp @@ -619,6 +619,8 @@ TSingleClusterReadSessionImpl* TReadSessionImplTestSetup::GetSession() { } Session = std::make_shared<TSingleClusterReadSessionImpl>( Settings, + "db", + "sessionid", ClusterName, Log, MockProcessorFactory, |