blob: 4bff7f55e081eb6cc1eb1799f31f33fb6db3fc7d (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
#include "assign_tasks.h"
namespace NKikimr::NBackgroundTasks {
std::optional<NMetadata::NRequest::TDialogYQLRequest::TRequest> TAssignTasksActor::OnSessionId(const TString& sessionId) {
Ydb::Table::ExecuteDataQueryRequest request;
TStringBuilder sb;
const auto now = TActivationContext::Now();
sb << "DECLARE $executorId AS String;" << Endl;
sb << "DECLARE $lastPingCriticalBorder AS Uint32;" << Endl;
sb << "DECLARE $lastPingNewValue AS Uint32;" << Endl;
sb << "$ids = (SELECT id FROM `" << Controller->GetTableName() << "`"
<< " WHERE (lastPing < $lastPingCriticalBorder"
<< " OR executorId IS NULL) AND enabled = true"
<< " LIMIT " << TasksCount << ");" << Endl;
sb << "UPSERT INTO `" + Controller->GetTableName() + "`"
<< " SELECT id, $executorId as executorId, $lastPingNewValue as lastPing"
<< " FROM $ids";
{
auto& param = (*request.mutable_parameters())["$lastPingCriticalBorder"];
param.mutable_value()->set_uint32_value((now - Controller->GetConfig().GetPingCheckPeriod()).Seconds());
param.mutable_type()->set_type_id(Ydb::Type::UINT32);
}
{
auto& param = (*request.mutable_parameters())["$lastPingNewValue"];
param.mutable_value()->set_uint32_value(now.Seconds());
param.mutable_type()->set_type_id(Ydb::Type::UINT32);
}
{
auto& param = (*request.mutable_parameters())["$executorId"];
param.mutable_value()->set_bytes_value(ExecutorId);
param.mutable_type()->set_type_id(Ydb::Type::STRING);
}
request.mutable_query()->set_yql_text(sb);
request.set_session_id(sessionId);
request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write();
request.mutable_tx_control()->set_commit_tx(true);
return request;
}
void TAssignTasksActor::OnResult(const NMetadata::NRequest::TDialogYQLRequest::TResponse& /*result*/) {
Controller->OnAssignFinished();
}
}
|