aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/core/ymq/actor/executor.h
diff options
context:
space:
mode:
Diffstat (limited to 'ydb/core/ymq/actor/executor.h')
-rw-r--r--ydb/core/ymq/actor/executor.h68
1 files changed, 34 insertions, 34 deletions
diff --git a/ydb/core/ymq/actor/executor.h b/ydb/core/ymq/actor/executor.h
index 5dd51c6da2..befb9ffa05 100644
--- a/ydb/core/ymq/actor/executor.h
+++ b/ydb/core/ymq/actor/executor.h
@@ -1,23 +1,23 @@
-#pragma once
+#pragma once
#include "defs.h"
-#include "events.h"
-
+#include "events.h"
+
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/client/minikql_compile/mkql_compile_service.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/ymq/actor/params.h>
#include <ydb/core/ymq/base/counters.h>
-
+
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
-
-#include <util/generic/hash.h>
+
+#include <util/generic/hash.h>
#include <util/generic/maybe.h>
-
+
namespace NKikimr::NSQS {
-
+
// Builds transaction request and properly executes it
// Can either send TEvExecute to queue leader or create execution actor.
class TExecutorBuilder {
@@ -145,26 +145,26 @@ private:
TIntrusivePtr<TTransactionCounters> TransactionCounters_;
};
-class TMiniKqlExecutionActor
+class TMiniKqlExecutionActor
: public TActorBootstrapped<TMiniKqlExecutionActor>
-{
- using TRequest = TEvTxUserProxy::TEvProposeTransaction;
- using TResponse = TEvTxUserProxy::TEvProposeTransactionStatus;
-
-public:
- TMiniKqlExecutionActor(
+{
+ using TRequest = TEvTxUserProxy::TEvProposeTransaction;
+ using TResponse = TEvTxUserProxy::TEvProposeTransactionStatus;
+
+public:
+ TMiniKqlExecutionActor(
const TActorId sender,
TString requestId,
- THolder<TRequest> req,
+ THolder<TRequest> req,
bool retryOnTimeout,
const TQueuePath& path, // queue or user
const TIntrusivePtr<TTransactionCounters>& counters,
TSqsEvents::TExecutedCallback cb = TSqsEvents::TExecutedCallback());
-
+
~TMiniKqlExecutionActor();
void Bootstrap();
-
+
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::SQS_EXECUTOR_ACTOR;
}
@@ -173,27 +173,27 @@ public:
QueryId_ = queryId;
}
-private:
+private:
void CompileProgram(bool forceRefresh);
-
+
void ProceedWithExecution();
-
+
TString GetRequestType() const;
TString GetActionType() const;
void LogRequestDuration();
-private:
+private:
STATEFN(AwaitState) {
- switch (ev->GetTypeRewrite()) {
+ switch (ev->GetTypeRewrite()) {
hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, HandleResponse);
hFunc(TMiniKQLCompileServiceEvents::TEvCompileStatus, HandleCompile);
hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, HandleResult);
hFunc(TEvTabletPipe::TEvClientDestroyed, HandlePipeClientDisconnected);
hFunc(TEvTabletPipe::TEvClientConnected, HandlePipeClientConnected);
hFunc(TEvWakeup, HandleWakeup);
- }
- }
-
+ }
+ }
+
void PassAway();
void HandleCompile(TMiniKQLCompileServiceEvents::TEvCompileStatus::TPtr& ev);
@@ -206,14 +206,14 @@ private:
void HandleResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev);
void HandlePipeClientConnected(TEvTabletPipe::TEvClientConnected::TPtr& ev);
void HandlePipeClientDisconnected(TEvTabletPipe::TEvClientDestroyed::TPtr& ev);
-
+
TDuration NextAttemptWaitDuration() const;
void ScheduleRetry(bool compilation);
bool RetryTimeoutExpired();
void WaitForCompletion(bool retry = false);
-private:
+private:
enum class EMode {
Compile,
Exec,
@@ -223,11 +223,11 @@ private:
private:
const TActorId Sender_;
const TString RequestId_;
- const TSqsEvents::TExecutedCallback Cb_;
- THolder<TRequest> Request_;
- TString MkqlProgramText_;
+ const TSqsEvents::TExecutedCallback Cb_;
+ THolder<TRequest> Request_;
+ TString MkqlProgramText_;
THashMap<TString, ui64> CompileResolveCookies_;
- bool CompilationPending_ = false;
+ bool CompilationPending_ = false;
size_t CompilationRetries_ = 3;
TMaybe<EQueryId> QueryId_; // information for logging
TInstant StartTs_;
@@ -243,6 +243,6 @@ private:
// Waiting for transaction to complete
TResponse::TPtr ResponseEvent_;
TActorId TabletPipeClient_;
-};
-
+};
+
} // namespace NKikimr::NSQS