1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
#include "logger.h"
#include "log_formatter.h"
#include <util/generic/utility.h>
namespace NKikimr {
namespace NCms {
using namespace NKikimrCms;
TLogger::TLogger(TCmsStatePtr state)
: State(state)
{
}
TString TLogger::GetLogMessage(const NKikimrCms::TLogRecord &rec,
NKikimrCms::ETextFormat format) const
{
switch (format) {
case TEXT_FORMAT_NONE:
return "";
case TEXT_FORMAT_SHORT:
return TLogFormatter<TEXT_FORMAT_SHORT>::Format(rec);
case TEXT_FORMAT_DETAILED:
return TLogFormatter<TEXT_FORMAT_DETAILED>::Format(rec);
default:
return TStringBuilder() << "[unsupported format]" << format;
}
}
bool TLogger::DbCleanupLog(TTransactionContext& txc,
const TActorContext& ctx)
{
NIceDb::TNiceDb db(txc.DB);
TInstant fromDate = ctx.Now() - State->Config.LogConfig.TTL;
ui64 from = Max<ui64>() - fromDate.GetValue();
LOG_DEBUG_S(ctx, NKikimrServices::CMS,
"Cleanup log records until " << fromDate);
auto rowset = db.Table<Schema::LogRecords>().GreaterOrEqual(from)
.Select<Schema::LogRecords::Timestamp>();
if (!rowset.IsReady())
return false;
TVector<ui64> ids;
while (!rowset.EndOfSet()) {
ids.push_back(rowset.GetValue<Schema::LogRecords::Timestamp>());
if (!rowset.Next())
return false;
}
LOG_DEBUG_S(ctx, NKikimrServices::CMS,
"Removing " << ids.size() << " log records");
for (auto id : ids)
db.Table<Schema::LogRecords>().Key(id).Delete();
return true;
}
bool TLogger::DbLoadLogTail(const NKikimrCms::TLogFilter &filter,
TVector<NKikimrCms::TLogRecord> &result,
TTransactionContext& txc)
{
result.clear();
ui64 from = 0;
ui64 to = Max<ui64>() - filter.GetMinTimestamp();
ui64 skip = filter.GetOffset();
ui64 remain = Min<ui32>(filter.GetLimit(), 10000);
ui32 type = filter.GetRecordType();
if (filter.GetMaxTimestamp())
from = Max<ui64>() - filter.GetMaxTimestamp();
NIceDb::TNiceDb db(txc.DB);
auto rowset = db.Table<Schema::LogRecords>().GreaterOrEqual(from)
.Select<Schema::LogRecords::TColumns>();
if (!rowset.IsReady())
return false;
while (remain && !rowset.EndOfSet()) {
auto timestamp = rowset.GetValue<Schema::LogRecords::Timestamp>();
if (timestamp > to)
break;
auto data = rowset.GetValue<Schema::LogRecords::Data>();
if (!type || data.GetRecordType() == type) {
if (skip) {
--skip;
} else {
result.push_back(NKikimrCms::TLogRecord());
result.back().SetTimestamp(Max<ui64>() - timestamp);
result.back().SetRecordType(data.GetRecordType());
result.back().MutableData()->Swap(&data);
--remain;
}
}
if (remain && !rowset.Next())
return false;
}
return true;
}
void TLogger::DbLogData(const TLogRecordData &data,
TTransactionContext& txc,
const TActorContext& ctx)
{
if (!State->Config.IsLogEnabled(data.GetRecordType()))
return;
ui64 timestamp = ctx.Now().GetValue();
if (timestamp <= State->LastLogRecordTimestamp)
timestamp = State->LastLogRecordTimestamp + 1;
State->LastLogRecordTimestamp = timestamp;
LOG_TRACE_S(ctx, NKikimrServices::CMS,
"Add log record to local DB"
<< " timestamp=" << timestamp
<< " data=" << data.ShortDebugString());
NIceDb::TNiceDb db(txc.DB);
db.Table<Schema::LogRecords>().Key(Max<ui64>() - timestamp)
.Update<Schema::LogRecords::Data>(data);
}
} // NCms
} // NKikimr
|