aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/services/bg_tasks/ds_table/assign_tasks.cpp
blob: 4bff7f55e081eb6cc1eb1799f31f33fb6db3fc7d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include "assign_tasks.h"

namespace NKikimr::NBackgroundTasks {

std::optional<NMetadata::NRequest::TDialogYQLRequest::TRequest> TAssignTasksActor::OnSessionId(const TString& sessionId) {
    Ydb::Table::ExecuteDataQueryRequest request;
    TStringBuilder sb;
    const auto now = TActivationContext::Now();
    sb << "DECLARE $executorId AS String;" << Endl;
    sb << "DECLARE $lastPingCriticalBorder AS Uint32;" << Endl;
    sb << "DECLARE $lastPingNewValue AS Uint32;" << Endl;
    sb << "$ids = (SELECT id FROM `" << Controller->GetTableName() << "`"
        << " WHERE (lastPing < $lastPingCriticalBorder"
        << " OR executorId IS NULL) AND enabled = true"
        << " LIMIT " << TasksCount << ");" << Endl;
    sb << "UPSERT INTO `" + Controller->GetTableName() + "`"
        << " SELECT id, $executorId as executorId, $lastPingNewValue as lastPing"
        << " FROM $ids";
    {
        auto& param = (*request.mutable_parameters())["$lastPingCriticalBorder"];
        param.mutable_value()->set_uint32_value((now - Controller->GetConfig().GetPingCheckPeriod()).Seconds());
        param.mutable_type()->set_type_id(Ydb::Type::UINT32);
    }
    {
        auto& param = (*request.mutable_parameters())["$lastPingNewValue"];
        param.mutable_value()->set_uint32_value(now.Seconds());
        param.mutable_type()->set_type_id(Ydb::Type::UINT32);
    }
    {
        auto& param = (*request.mutable_parameters())["$executorId"];
        param.mutable_value()->set_bytes_value(ExecutorId);
        param.mutable_type()->set_type_id(Ydb::Type::STRING);
    }
    request.mutable_query()->set_yql_text(sb);
    request.set_session_id(sessionId);
    request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write();
    request.mutable_tx_control()->set_commit_tx(true);
    return request;
}

void TAssignTasksActor::OnResult(const NMetadata::NRequest::TDialogYQLRequest::TResponse& /*result*/) {
    Controller->OnAssignFinished();
}

}