diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2023-12-19 17:15:26 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-19 17:15:26 +0300 |
commit | c1d5952e191bbf524d2e11debbf1a5dee1e8d971 (patch) | |
tree | 21ae354a3c8b26c7a3069093b905166e7ffc325d | |
parent | 34c14763cf90193322e73397f6bbff026ab3c5a7 (diff) | |
download | ydb-c1d5952e191bbf524d2e11debbf1a5dee1e8d971.tar.gz |
fix sequence requests lost and add debugging tools to sequence proxy (#570)
-rw-r--r-- | ydb/core/kqp/runtime/kqp_sequencer_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.h | 6 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 11 | ||||
-rw-r--r-- | ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp | 53 | ||||
-rw-r--r-- | ydb/core/tx/sequenceproxy/sequenceproxy_impl.h | 22 |
6 files changed, 76 insertions, 19 deletions
diff --git a/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp b/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp index 87616ce4877..3c0c9cc9708 100644 --- a/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp @@ -183,7 +183,7 @@ private: finished = (status == NUdf::EFetchStatus::Finish) && (UnprocessedRows == 0); - if (WaitingReplies == 0) { + if (PendingRows.size() > 0 && WaitingReplies == 0) { Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 209224b1975..0eb2c0ee046 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -15,6 +15,8 @@ #include <ydb/core/kqp/common/kqp_user_request_context.h> #include <ydb/core/kqp/session_actor/kqp_tx.h> +#include <ydb/library/actors/core/monotonic_provider.h> + #include <util/generic/noncopyable.h> #include <util/generic/string.h> @@ -33,7 +35,7 @@ public: TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database, const TString& cluster, TKqpDbCountersPtr dbCounters, bool longSession, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, - NWilson::TTraceId&& traceId, const TString& sessionId) + NWilson::TTraceId&& traceId, const TString& sessionId, TMonotonic startedAt) : QueryId(queryId) , Database(database) , Cluster(cluster) @@ -46,6 +48,7 @@ public: , StartTime(TInstant::Now()) , KeepSession(ev->Get()->GetKeepSession() || longSession) , UserToken(ev->Get()->GetUserToken()) + , StartedAt(startedAt) { RequestEv.reset(ev->Release().Release()); @@ -98,6 +101,7 @@ public: NKqpProto::TKqpStatsQuery Stats; bool KeepSession = false; TIntrusiveConstPtr<NACLib::TUserToken> UserToken; + NActors::TMonotonic StartedAt; THashMap<NKikimr::TTableId, ui64> TableVersions; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 8264f7327b2..beb05142a3c 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -218,7 +218,8 @@ public: ev->Get()->SetClientLostAction(selfId, as); QueryState = std::make_shared<TKqpQueryState>( ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession, - Settings.TableService, Settings.QueryService, std::move(id), SessionId); + Settings.TableService, Settings.QueryService, std::move(id), SessionId, + AppData()->MonotonicTimeProvider->Now()); if (QueryState->UserRequestContext->TraceId.empty()) { QueryState->UserRequestContext->TraceId = UlidGen.Next().ToString(); } @@ -1309,10 +1310,12 @@ public: TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName(); LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) << " send to: " << ExecuterId); + TString reason = TStringBuilder() << "Request timeout exceeded, cancelling after " + << (AppData()->MonotonicTimeProvider->Now() - QueryState->StartedAt).MilliSeconds() + << " milliseconds."; + if (ExecuterId) { - auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>( - msg.GetStatusCode(), - "Request timeout exceeded"); + auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), reason); Send(ExecuterId, abortEv.Release(), IEventHandle::FlagTrackDelivery); } else { const auto& issues = ev->Get()->GetIssues(); diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp index 88b8e1ba60f..be9da3e1504 100644 --- a/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp +++ b/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp @@ -142,6 +142,7 @@ namespace NSequenceProxy { auto& info = AllocateInFlight[cookie]; info.Database = database; info.PathId = pathId; + Counters->SequenceShardAllocateCount->Collect(cache); Register(new TAllocateActor(SelfId(), cookie, tabletId, pathId, cache)); return cookie; } diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp index d9faf8d5111..ed193ac9b64 100644 --- a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp +++ b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp @@ -1,5 +1,6 @@ #include "sequenceproxy_impl.h" +#include <ydb/core/base/appdata_fwd.h> #include <ydb/library/ydb_issue/issue_helpers.h> #include <ydb/library/yql/public/issue/yql_issue_manager.h> @@ -15,7 +16,21 @@ namespace NKikimr { namespace NSequenceProxy { + TSequenceProxyCounters::TSequenceProxyCounters() { + auto group = GetServiceCounters(AppData()->Counters, "proxy"); + SequenceShardAllocateCount = group->GetHistogram( + "SequenceProxy/SequenceShard/AllocateCountPerRequest", + NMonitoring::ExponentialHistogram(20, 2, 1)); + + ErrorsCount = group->GetCounter("SequenceProxy/Errors", true); + RequestCount = group->GetCounter("SequenceProxy/Requests", true); + ResponseCount = group->GetCounter("SequenceProxy/Responses", true); + NextValLatency = group->GetHistogram("SequenceProxy/Latency", + NMonitoring::ExponentialHistogram(20, 2, 1)); + }; + void TSequenceProxy::Bootstrap() { + Counters.Reset(new TSequenceProxyCounters()); LogPrefix = TStringBuilder() << "TSequenceProxy [Node " << SelfId().NodeId() << "] "; Become(&TThis::StateWork); } @@ -30,6 +45,7 @@ namespace NSequenceProxy { request.Sender = ev->Sender; request.Cookie = ev->Cookie; request.UserToken = std::move(msg->UserToken); + request.StartAt = AppData()->MonotonicTimeProvider->Now(); std::visit( [&](const auto& path) { DoNextVal(std::move(request), msg->Database, path); @@ -37,6 +53,21 @@ namespace NSequenceProxy { msg->Path); } + void TSequenceProxy::Reply(const TNextValRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) { + Counters->ResponseCount->Inc(); + auto milliseconds = (AppData()->MonotonicTimeProvider->Now() - request.StartAt).MilliSeconds(); + Counters->NextValLatency->Collect(milliseconds); + Counters->ErrorsCount->Inc(); + Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie); + } + + void TSequenceProxy::Reply(const TNextValRequestInfo& request, const TPathId& pathId, i64 value) { + Counters->ResponseCount->Inc(); + auto milliseconds = (AppData()->MonotonicTimeProvider->Now() - request.StartAt).MilliSeconds(); + Counters->NextValLatency->Collect(milliseconds); + Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(pathId, value), 0, request.Cookie); + } + void TSequenceProxy::MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info) { if (!info.ResolveInProgress && !info.NewNextValResolve.empty()) { info.PendingNextValResolve = std::move(info.NewNextValResolve); @@ -46,12 +77,15 @@ namespace NSequenceProxy { } void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path) { + Counters->RequestCount->Inc(); auto& info = Databases[database].SequenceByName[path]; info.NewNextValResolve.emplace_back(std::move(request)); MaybeStartResolve(database, path, info); } void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh) { + Counters->RequestCount->Inc(); + auto& info = Databases[database].SequenceByPathId[pathId]; if (!info.ResolveInProgress && (needRefresh || !info.SequenceInfo)) { StartResolve(database, pathId, !info.SequenceInfo); @@ -77,14 +111,13 @@ namespace NSequenceProxy { OnChanged(database, pathId, info); } - void TSequenceProxy::OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) { + void TSequenceProxy::OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) { auto& info = Databases[database].SequenceByName[path]; Y_ABORT_UNLESS(info.ResolveInProgress); info.ResolveInProgress = false; while (!info.PendingNextValResolve.empty()) { - const auto& request = info.PendingNextValResolve.front(); - Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie); + Reply(info.PendingNextValResolve.front(), status, issues); info.PendingNextValResolve.pop_front(); } @@ -111,14 +144,13 @@ namespace NSequenceProxy { MaybeStartResolve(database, path, info); } - void TSequenceProxy::OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) { + void TSequenceProxy::OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) { auto& info = Databases[database].SequenceByPathId[pathId]; Y_ABORT_UNLESS(info.ResolveInProgress); info.ResolveInProgress = false; while (!info.PendingNextValResolve.empty()) { - const auto& request = info.PendingNextValResolve.front(); - Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie); + Reply(info.PendingNextValResolve.front(), status, issues); info.PendingNextValResolve.pop_front(); } } @@ -144,7 +176,7 @@ namespace NSequenceProxy { info.PendingNextVal.emplace_back(std::move(request)); ++info.TotalRequested; } - resolved.pop_back(); + resolved.pop_front(); } OnChanged(database, pathId, info); @@ -173,8 +205,7 @@ namespace NSequenceProxy { } else { // We will answer up to cache requests with this error while (cache > 0 && !info.PendingNextVal.empty()) { - const auto& request = info.PendingNextVal.front(); - Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(msg->Status, msg->Issues), 0, request.Cookie); + Reply(info.PendingNextVal.front(), msg->Status, msg->Issues); info.PendingNextVal.pop_front(); --info.TotalRequested; --cache; @@ -209,7 +240,7 @@ namespace NSequenceProxy { << "Access denied for " << request.UserToken->GetUserSID() << " to sequence " << pathId; NYql::TIssueManager issueManager; issueManager.RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::ACCESS_DENIED, error)); - Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(Ydb::StatusIds::UNAUTHORIZED, issueManager.GetIssues())); + Reply(request, Ydb::StatusIds::UNAUTHORIZED, issueManager.GetIssues()); return true; } } @@ -226,7 +257,7 @@ namespace NSequenceProxy { Y_ABORT_UNLESS(!info.CachedAllocations.empty()); auto& front = info.CachedAllocations.front(); Y_ABORT_UNLESS(front.Count > 0); - Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(pathId, front.Start), 0, request.Cookie); + Reply(request, pathId, front.Start); --info.TotalCached; if (--front.Count > 0) { front.Start += front.Increment; diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h index 2e9ab3ce56e..65e9f8993c8 100644 --- a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h +++ b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h @@ -5,12 +5,26 @@ #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/core/tx/sequenceproxy/public/events.h> +#include <ydb/core/base/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include <ydb/library/actors/core/actor_bootstrapped.h> #include <ydb/library/actors/core/hfunc.h> +#include <ydb/library/actors/core/monotonic_provider.h> namespace NKikimr { namespace NSequenceProxy { + struct TSequenceProxyCounters : TAtomicRefCount<TSequenceProxyCounters> { + ::NMonitoring::TDynamicCounters::TCounterPtr RequestCount; + ::NMonitoring::TDynamicCounters::TCounterPtr ResponseCount; + ::NMonitoring::TDynamicCounters::TCounterPtr ErrorsCount; + + ::NMonitoring::THistogramPtr SequenceShardAllocateCount; + ::NMonitoring::THistogramPtr NextValLatency; + + TSequenceProxyCounters(); + }; + class TSequenceProxy : public TActorBootstrapped<TSequenceProxy> { public: TSequenceProxy(const TSequenceProxySettings& settings) @@ -58,6 +72,7 @@ namespace NSequenceProxy { TActorId Sender; ui64 Cookie; TIntrusivePtr<NACLib::TUserToken> UserToken; + TMonotonic StartAt; }; struct TCachedAllocation { @@ -127,13 +142,15 @@ namespace NSequenceProxy { void Handle(TEvPrivate::TEvAllocateResult::TPtr& ev); void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); + void Reply(const TNextValRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues); + void Reply(const TNextValRequestInfo& request, const TPathId& pathId, i64 value); ui64 StartResolve(const TString& database, const std::variant<TString, TPathId>& path, bool syncVersion); ui64 StartAllocate(ui64 tabletId, const TString& database, const TPathId& pathId, ui64 cache); void MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info); void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path); void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh = true); - void OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues); - void OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues); + void OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues); + void OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues); void OnResolveResult(const TString& database, const TString& path, TResolveResult&& result); void OnResolveResult(const TString& database, const TPathId& pathId, TResolveResult&& result); void OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info, TList<TNextValRequestInfo>& resolved); @@ -148,6 +165,7 @@ namespace NSequenceProxy { THashMap<ui64, TResolveInFlight> ResolveInFlight; THashMap<ui64, TAllocateInFlight> AllocateInFlight; ui64 LastCookie = 0; + TIntrusivePtr<TSequenceProxyCounters> Counters; }; } // namespace NSequenceProxy |