diff options
Diffstat (limited to 'ydb/core/ymq/actor/executor.h')
-rw-r--r-- | ydb/core/ymq/actor/executor.h | 68 |
1 files changed, 34 insertions, 34 deletions
diff --git a/ydb/core/ymq/actor/executor.h b/ydb/core/ymq/actor/executor.h index 5dd51c6da2..befb9ffa05 100644 --- a/ydb/core/ymq/actor/executor.h +++ b/ydb/core/ymq/actor/executor.h @@ -1,23 +1,23 @@ -#pragma once +#pragma once #include "defs.h" -#include "events.h" - +#include "events.h" + #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/client/minikql_compile/mkql_compile_service.h> #include <ydb/core/tx/schemeshard/schemeshard.h> #include <ydb/core/ymq/actor/params.h> #include <ydb/core/ymq/base/counters.h> - + #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/monlib/dynamic_counters/counters.h> - -#include <util/generic/hash.h> + +#include <util/generic/hash.h> #include <util/generic/maybe.h> - + namespace NKikimr::NSQS { - + // Builds transaction request and properly executes it // Can either send TEvExecute to queue leader or create execution actor. class TExecutorBuilder { @@ -145,26 +145,26 @@ private: TIntrusivePtr<TTransactionCounters> TransactionCounters_; }; -class TMiniKqlExecutionActor +class TMiniKqlExecutionActor : public TActorBootstrapped<TMiniKqlExecutionActor> -{ - using TRequest = TEvTxUserProxy::TEvProposeTransaction; - using TResponse = TEvTxUserProxy::TEvProposeTransactionStatus; - -public: - TMiniKqlExecutionActor( +{ + using TRequest = TEvTxUserProxy::TEvProposeTransaction; + using TResponse = TEvTxUserProxy::TEvProposeTransactionStatus; + +public: + TMiniKqlExecutionActor( const TActorId sender, TString requestId, - THolder<TRequest> req, + THolder<TRequest> req, bool retryOnTimeout, const TQueuePath& path, // queue or user const TIntrusivePtr<TTransactionCounters>& counters, TSqsEvents::TExecutedCallback cb = TSqsEvents::TExecutedCallback()); - + ~TMiniKqlExecutionActor(); void Bootstrap(); - + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::SQS_EXECUTOR_ACTOR; } @@ -173,27 +173,27 @@ public: QueryId_ = queryId; } -private: +private: void CompileProgram(bool forceRefresh); - + void ProceedWithExecution(); - + TString GetRequestType() const; TString GetActionType() const; void LogRequestDuration(); -private: +private: STATEFN(AwaitState) { - switch (ev->GetTypeRewrite()) { + switch (ev->GetTypeRewrite()) { hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, HandleResponse); hFunc(TMiniKQLCompileServiceEvents::TEvCompileStatus, HandleCompile); hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, HandleResult); hFunc(TEvTabletPipe::TEvClientDestroyed, HandlePipeClientDisconnected); hFunc(TEvTabletPipe::TEvClientConnected, HandlePipeClientConnected); hFunc(TEvWakeup, HandleWakeup); - } - } - + } + } + void PassAway(); void HandleCompile(TMiniKQLCompileServiceEvents::TEvCompileStatus::TPtr& ev); @@ -206,14 +206,14 @@ private: void HandleResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev); void HandlePipeClientConnected(TEvTabletPipe::TEvClientConnected::TPtr& ev); void HandlePipeClientDisconnected(TEvTabletPipe::TEvClientDestroyed::TPtr& ev); - + TDuration NextAttemptWaitDuration() const; void ScheduleRetry(bool compilation); bool RetryTimeoutExpired(); void WaitForCompletion(bool retry = false); -private: +private: enum class EMode { Compile, Exec, @@ -223,11 +223,11 @@ private: private: const TActorId Sender_; const TString RequestId_; - const TSqsEvents::TExecutedCallback Cb_; - THolder<TRequest> Request_; - TString MkqlProgramText_; + const TSqsEvents::TExecutedCallback Cb_; + THolder<TRequest> Request_; + TString MkqlProgramText_; THashMap<TString, ui64> CompileResolveCookies_; - bool CompilationPending_ = false; + bool CompilationPending_ = false; size_t CompilationRetries_ = 3; TMaybe<EQueryId> QueryId_; // information for logging TInstant StartTs_; @@ -243,6 +243,6 @@ private: // Waiting for transaction to complete TResponse::TPtr ResponseEvent_; TActorId TabletPipeClient_; -}; - +}; + } // namespace NKikimr::NSQS |