aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2023-12-19 17:15:26 +0300
committerGitHub <noreply@github.com>2023-12-19 17:15:26 +0300
commitc1d5952e191bbf524d2e11debbf1a5dee1e8d971 (patch)
tree21ae354a3c8b26c7a3069093b905166e7ffc325d
parent34c14763cf90193322e73397f6bbff026ab3c5a7 (diff)
downloadydb-c1d5952e191bbf524d2e11debbf1a5dee1e8d971.tar.gz
fix sequence requests lost and add debugging tools to sequence proxy (#570)
-rw-r--r--ydb/core/kqp/runtime/kqp_sequencer_actor.cpp2
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h6
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp11
-rw-r--r--ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp1
-rw-r--r--ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp53
-rw-r--r--ydb/core/tx/sequenceproxy/sequenceproxy_impl.h22
6 files changed, 76 insertions, 19 deletions
diff --git a/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp b/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp
index 87616ce4877..3c0c9cc9708 100644
--- a/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp
@@ -183,7 +183,7 @@ private:
finished = (status == NUdf::EFetchStatus::Finish)
&& (UnprocessedRows == 0);
- if (WaitingReplies == 0) {
+ if (PendingRows.size() > 0 && WaitingReplies == 0) {
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index 209224b1975..0eb2c0ee046 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -15,6 +15,8 @@
#include <ydb/core/kqp/common/kqp_user_request_context.h>
#include <ydb/core/kqp/session_actor/kqp_tx.h>
+#include <ydb/library/actors/core/monotonic_provider.h>
+
#include <util/generic/noncopyable.h>
#include <util/generic/string.h>
@@ -33,7 +35,7 @@ public:
TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database,
const TString& cluster, TKqpDbCountersPtr dbCounters, bool longSession,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
- NWilson::TTraceId&& traceId, const TString& sessionId)
+ NWilson::TTraceId&& traceId, const TString& sessionId, TMonotonic startedAt)
: QueryId(queryId)
, Database(database)
, Cluster(cluster)
@@ -46,6 +48,7 @@ public:
, StartTime(TInstant::Now())
, KeepSession(ev->Get()->GetKeepSession() || longSession)
, UserToken(ev->Get()->GetUserToken())
+ , StartedAt(startedAt)
{
RequestEv.reset(ev->Release().Release());
@@ -98,6 +101,7 @@ public:
NKqpProto::TKqpStatsQuery Stats;
bool KeepSession = false;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
+ NActors::TMonotonic StartedAt;
THashMap<NKikimr::TTableId, ui64> TableVersions;
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 8264f7327b2..beb05142a3c 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -218,7 +218,8 @@ public:
ev->Get()->SetClientLostAction(selfId, as);
QueryState = std::make_shared<TKqpQueryState>(
ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession,
- Settings.TableService, Settings.QueryService, std::move(id), SessionId);
+ Settings.TableService, Settings.QueryService, std::move(id), SessionId,
+ AppData()->MonotonicTimeProvider->Now());
if (QueryState->UserRequestContext->TraceId.empty()) {
QueryState->UserRequestContext->TraceId = UlidGen.Next().ToString();
}
@@ -1309,10 +1310,12 @@ public:
TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName();
LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) << " send to: " << ExecuterId);
+ TString reason = TStringBuilder() << "Request timeout exceeded, cancelling after "
+ << (AppData()->MonotonicTimeProvider->Now() - QueryState->StartedAt).MilliSeconds()
+ << " milliseconds.";
+
if (ExecuterId) {
- auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(
- msg.GetStatusCode(),
- "Request timeout exceeded");
+ auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), reason);
Send(ExecuterId, abortEv.Release(), IEventHandle::FlagTrackDelivery);
} else {
const auto& issues = ev->Get()->GetIssues();
diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp
index 88b8e1ba60f..be9da3e1504 100644
--- a/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp
+++ b/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp
@@ -142,6 +142,7 @@ namespace NSequenceProxy {
auto& info = AllocateInFlight[cookie];
info.Database = database;
info.PathId = pathId;
+ Counters->SequenceShardAllocateCount->Collect(cache);
Register(new TAllocateActor(SelfId(), cookie, tabletId, pathId, cache));
return cookie;
}
diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp
index d9faf8d5111..ed193ac9b64 100644
--- a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp
+++ b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp
@@ -1,5 +1,6 @@
#include "sequenceproxy_impl.h"
+#include <ydb/core/base/appdata_fwd.h>
#include <ydb/library/ydb_issue/issue_helpers.h>
#include <ydb/library/yql/public/issue/yql_issue_manager.h>
@@ -15,7 +16,21 @@
namespace NKikimr {
namespace NSequenceProxy {
+ TSequenceProxyCounters::TSequenceProxyCounters() {
+ auto group = GetServiceCounters(AppData()->Counters, "proxy");
+ SequenceShardAllocateCount = group->GetHistogram(
+ "SequenceProxy/SequenceShard/AllocateCountPerRequest",
+ NMonitoring::ExponentialHistogram(20, 2, 1));
+
+ ErrorsCount = group->GetCounter("SequenceProxy/Errors", true);
+ RequestCount = group->GetCounter("SequenceProxy/Requests", true);
+ ResponseCount = group->GetCounter("SequenceProxy/Responses", true);
+ NextValLatency = group->GetHistogram("SequenceProxy/Latency",
+ NMonitoring::ExponentialHistogram(20, 2, 1));
+ };
+
void TSequenceProxy::Bootstrap() {
+ Counters.Reset(new TSequenceProxyCounters());
LogPrefix = TStringBuilder() << "TSequenceProxy [Node " << SelfId().NodeId() << "] ";
Become(&TThis::StateWork);
}
@@ -30,6 +45,7 @@ namespace NSequenceProxy {
request.Sender = ev->Sender;
request.Cookie = ev->Cookie;
request.UserToken = std::move(msg->UserToken);
+ request.StartAt = AppData()->MonotonicTimeProvider->Now();
std::visit(
[&](const auto& path) {
DoNextVal(std::move(request), msg->Database, path);
@@ -37,6 +53,21 @@ namespace NSequenceProxy {
msg->Path);
}
+ void TSequenceProxy::Reply(const TNextValRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
+ Counters->ResponseCount->Inc();
+ auto milliseconds = (AppData()->MonotonicTimeProvider->Now() - request.StartAt).MilliSeconds();
+ Counters->NextValLatency->Collect(milliseconds);
+ Counters->ErrorsCount->Inc();
+ Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie);
+ }
+
+ void TSequenceProxy::Reply(const TNextValRequestInfo& request, const TPathId& pathId, i64 value) {
+ Counters->ResponseCount->Inc();
+ auto milliseconds = (AppData()->MonotonicTimeProvider->Now() - request.StartAt).MilliSeconds();
+ Counters->NextValLatency->Collect(milliseconds);
+ Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(pathId, value), 0, request.Cookie);
+ }
+
void TSequenceProxy::MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info) {
if (!info.ResolveInProgress && !info.NewNextValResolve.empty()) {
info.PendingNextValResolve = std::move(info.NewNextValResolve);
@@ -46,12 +77,15 @@ namespace NSequenceProxy {
}
void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path) {
+ Counters->RequestCount->Inc();
auto& info = Databases[database].SequenceByName[path];
info.NewNextValResolve.emplace_back(std::move(request));
MaybeStartResolve(database, path, info);
}
void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh) {
+ Counters->RequestCount->Inc();
+
auto& info = Databases[database].SequenceByPathId[pathId];
if (!info.ResolveInProgress && (needRefresh || !info.SequenceInfo)) {
StartResolve(database, pathId, !info.SequenceInfo);
@@ -77,14 +111,13 @@ namespace NSequenceProxy {
OnChanged(database, pathId, info);
}
- void TSequenceProxy::OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) {
+ void TSequenceProxy::OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
auto& info = Databases[database].SequenceByName[path];
Y_ABORT_UNLESS(info.ResolveInProgress);
info.ResolveInProgress = false;
while (!info.PendingNextValResolve.empty()) {
- const auto& request = info.PendingNextValResolve.front();
- Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie);
+ Reply(info.PendingNextValResolve.front(), status, issues);
info.PendingNextValResolve.pop_front();
}
@@ -111,14 +144,13 @@ namespace NSequenceProxy {
MaybeStartResolve(database, path, info);
}
- void TSequenceProxy::OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) {
+ void TSequenceProxy::OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
auto& info = Databases[database].SequenceByPathId[pathId];
Y_ABORT_UNLESS(info.ResolveInProgress);
info.ResolveInProgress = false;
while (!info.PendingNextValResolve.empty()) {
- const auto& request = info.PendingNextValResolve.front();
- Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie);
+ Reply(info.PendingNextValResolve.front(), status, issues);
info.PendingNextValResolve.pop_front();
}
}
@@ -144,7 +176,7 @@ namespace NSequenceProxy {
info.PendingNextVal.emplace_back(std::move(request));
++info.TotalRequested;
}
- resolved.pop_back();
+ resolved.pop_front();
}
OnChanged(database, pathId, info);
@@ -173,8 +205,7 @@ namespace NSequenceProxy {
} else {
// We will answer up to cache requests with this error
while (cache > 0 && !info.PendingNextVal.empty()) {
- const auto& request = info.PendingNextVal.front();
- Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(msg->Status, msg->Issues), 0, request.Cookie);
+ Reply(info.PendingNextVal.front(), msg->Status, msg->Issues);
info.PendingNextVal.pop_front();
--info.TotalRequested;
--cache;
@@ -209,7 +240,7 @@ namespace NSequenceProxy {
<< "Access denied for " << request.UserToken->GetUserSID() << " to sequence " << pathId;
NYql::TIssueManager issueManager;
issueManager.RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::ACCESS_DENIED, error));
- Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(Ydb::StatusIds::UNAUTHORIZED, issueManager.GetIssues()));
+ Reply(request, Ydb::StatusIds::UNAUTHORIZED, issueManager.GetIssues());
return true;
}
}
@@ -226,7 +257,7 @@ namespace NSequenceProxy {
Y_ABORT_UNLESS(!info.CachedAllocations.empty());
auto& front = info.CachedAllocations.front();
Y_ABORT_UNLESS(front.Count > 0);
- Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(pathId, front.Start), 0, request.Cookie);
+ Reply(request, pathId, front.Start);
--info.TotalCached;
if (--front.Count > 0) {
front.Start += front.Increment;
diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h
index 2e9ab3ce56e..65e9f8993c8 100644
--- a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h
+++ b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h
@@ -5,12 +5,26 @@
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/sequenceproxy/public/events.h>
+#include <ydb/core/base/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
+#include <ydb/library/actors/core/monotonic_provider.h>
namespace NKikimr {
namespace NSequenceProxy {
+ struct TSequenceProxyCounters : TAtomicRefCount<TSequenceProxyCounters> {
+ ::NMonitoring::TDynamicCounters::TCounterPtr RequestCount;
+ ::NMonitoring::TDynamicCounters::TCounterPtr ResponseCount;
+ ::NMonitoring::TDynamicCounters::TCounterPtr ErrorsCount;
+
+ ::NMonitoring::THistogramPtr SequenceShardAllocateCount;
+ ::NMonitoring::THistogramPtr NextValLatency;
+
+ TSequenceProxyCounters();
+ };
+
class TSequenceProxy : public TActorBootstrapped<TSequenceProxy> {
public:
TSequenceProxy(const TSequenceProxySettings& settings)
@@ -58,6 +72,7 @@ namespace NSequenceProxy {
TActorId Sender;
ui64 Cookie;
TIntrusivePtr<NACLib::TUserToken> UserToken;
+ TMonotonic StartAt;
};
struct TCachedAllocation {
@@ -127,13 +142,15 @@ namespace NSequenceProxy {
void Handle(TEvPrivate::TEvAllocateResult::TPtr& ev);
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
+ void Reply(const TNextValRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues);
+ void Reply(const TNextValRequestInfo& request, const TPathId& pathId, i64 value);
ui64 StartResolve(const TString& database, const std::variant<TString, TPathId>& path, bool syncVersion);
ui64 StartAllocate(ui64 tabletId, const TString& database, const TPathId& pathId, ui64 cache);
void MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info);
void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path);
void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh = true);
- void OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues);
- void OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues);
+ void OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues);
+ void OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues);
void OnResolveResult(const TString& database, const TString& path, TResolveResult&& result);
void OnResolveResult(const TString& database, const TPathId& pathId, TResolveResult&& result);
void OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info, TList<TNextValRequestInfo>& resolved);
@@ -148,6 +165,7 @@ namespace NSequenceProxy {
THashMap<ui64, TResolveInFlight> ResolveInFlight;
THashMap<ui64, TAllocateInFlight> AllocateInFlight;
ui64 LastCookie = 0;
+ TIntrusivePtr<TSequenceProxyCounters> Counters;
};
} // namespace NSequenceProxy