diff options
author | snaury <snaury@ydb.tech> | 2023-02-17 16:43:17 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-02-17 16:43:17 +0300 |
commit | 750a5c2f50390814aa092a0af80f86325d782c4b (patch) | |
tree | c1ecd792cfd112362ee5725bff8ce912ab9d4fcc | |
parent | 54f53419c080827f9bb5e422844443dd9557e1d8 (diff) | |
download | ydb-750a5c2f50390814aa092a0af80f86325d782c4b.tar.gz |
Remove support for old read table, no server inactivity timer by default
-rw-r--r-- | ydb/core/grpc_services/rpc_read_table.cpp | 241 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 2 | ||||
-rw-r--r-- | ydb/core/protos/stream.proto | 2 |
3 files changed, 117 insertions, 128 deletions
diff --git a/ydb/core/grpc_services/rpc_read_table.cpp b/ydb/core/grpc_services/rpc_read_table.cpp index 010b5e7cf3..c74c22499c 100644 --- a/ydb/core/grpc_services/rpc_read_table.cpp +++ b/ydb/core/grpc_services/rpc_read_table.cpp @@ -110,9 +110,10 @@ static void ConvertKeyRange(const Ydb::Table::KeyRange& keyRange, const TGetOutp class TReadTableRPC : public TActorBootstrapped<TReadTableRPC> { enum EWakeupTag : ui64 { - TimeoutTimer = 0, RlSendAllowed = 1, - RlNoResource = 2 + RlNoResource = 2, + ClientInactivity = 3, + ServerInactivity = 4, }; public: @@ -136,16 +137,13 @@ public: LastStatusTimestamp_ = ctx.Now(); LastDataStreamTimestamp_ = ctx.Now(); - InactiveClientTimeout_ = TDuration::FromValue(cfg.GetInactiveClientTimeout()); - InactiveServerTimeout_ = TDuration::FromValue(cfg.GetInactiveServerTimeout()); + InactiveClientTimeout_ = TDuration::MicroSeconds(cfg.GetInactiveClientTimeout()); + InactiveServerTimeout_ = TDuration::MicroSeconds(cfg.GetInactiveServerTimeout()); - TDuration timeout; - if (InactiveClientTimeout_) - timeout = InactiveClientTimeout_; - if (InactiveServerTimeout_) - timeout = timeout ? Min(timeout, InactiveServerTimeout_) : InactiveServerTimeout_; - if (timeout) - SetTimeoutTimer(timeout, ctx); + if (InactiveServerTimeout_) { + StartInactivityTimer(InactiveServerTimer_, InactiveServerTimeout_, EWakeupTag::ServerInactivity, ctx); + InactiveServerTimerPending_ = true; + } SendProposeRequest(ctx); @@ -156,7 +154,6 @@ public: as->Send(actorId, new TEvents::TEvPoisonPill()); }; Request_->SetClientLostAction(std::move(clientLostCb)); - } void PassAway() override { @@ -164,6 +161,14 @@ public: Send(ReadTableActor, new TEvents::TEvPoison); ReadTableActor = { }; } + if (InactiveClientTimer_) { + Send(InactiveClientTimer_, new TEvents::TEvPoison); + InactiveClientTimer_ = { }; + } + if (InactiveServerTimer_) { + Send(InactiveServerTimer_, new TEvents::TEvPoison); + InactiveServerTimer_ = { }; + } TActorBootstrapped::PassAway(); } @@ -335,12 +340,18 @@ private: void Handle(TEvents::TEvWakeup::TPtr &ev, const TActorContext &ctx) { switch ((EWakeupTag) ev->Get()->Tag) { - case EWakeupTag::TimeoutTimer: - return ProcessTimeoutTimer(ctx); case EWakeupTag::RlSendAllowed: return ProcessRlSendAllowed(ctx); case EWakeupTag::RlNoResource: return ProcessRlNoResource(ctx); + case EWakeupTag::ClientInactivity: + InactiveClientTimer_ = { }; + InactiveClientTimerPending_ = false; + return ProcessClientInactivity(ctx); + case EWakeupTag::ServerInactivity: + InactiveServerTimer_ = { }; + InactiveServerTimerPending_ = false; + return ProcessServerInactivity(ctx); } } @@ -376,49 +387,53 @@ private: return ReplyFinishStream(Ydb::StatusIds::OVERLOADED, message, ctx); } - void ProcessTimeoutTimer(const TActorContext &ctx) { + void ProcessClientInactivity(const TActorContext &ctx) { + // We don't consider client inactive when there's nothing for the client to read + if (!LeftInGRpcAdaptorQueue_) { + return; + } + TInstant now = ctx.Now(); - TDuration timeout; - - if (InactiveClientTimeout_ && LeftInGRpcAdaptorQueue_) { - TDuration processTime = now - LastDataStreamTimestamp_; - if (processTime >= InactiveClientTimeout_) { - TStringStream ss; - ss << SelfId() << " Client cannot process data in " << processTime - << " which exceeds client timeout " << InactiveClientTimeout_; - LOG_NOTICE_S(ctx, NKikimrServices::READ_TABLE_API, ss.Str()); - return HandleStreamTimeout(ctx, ss.Str()); - } else { - TDuration remain = InactiveClientTimeout_ - processTime; - timeout = timeout ? Min(timeout, remain) : remain; - } + TDuration processTime = now - LastDataStreamTimestamp_; + + if (processTime >= InactiveClientTimeout_) { + TStringStream ss; + ss << SelfId() << " Client cannot process data in " << processTime + << " which exceeds client timeout " << InactiveClientTimeout_; + LOG_NOTICE_S(ctx, NKikimrServices::READ_TABLE_API, ss.Str()); + return HandleStreamTimeout(ctx, ss.Str()); } - if (InactiveServerTimeout_) { - TDuration processTime = now - LastStatusTimestamp_; - // Ignore server timeout if response buffer is full. - if (LeftInGRpcAdaptorQueue_ == QuotaLimit_) { - timeout = timeout ? Min(timeout, InactiveServerTimeout_) : InactiveServerTimeout_; - } else if (processTime >= InactiveServerTimeout_) { - TStringStream ss; - ss << SelfId() - << " Server doesn't provide data for " << processTime - << " which exceeds server timeout " << InactiveServerTimeout_ - << " (QuotaRequestQueue: " << QuotaRequestQueue_.size() - << " ResponseQueue: " << LeftInGRpcAdaptorQueue_ - << " QuotaLimit: " << QuotaLimit_ - << " QuotaReserved: " << QuotaReserved_ - << " QuotaByShard: " << QuotaByShard_.size() << ")"; - LOG_NOTICE_S(ctx, NKikimrServices::READ_TABLE_API, ss.Str()); - return HandleStreamTimeout(ctx, ss.Str()); - } else { - TDuration remain = InactiveServerTimeout_ - processTime; - timeout = timeout ? Min(timeout, remain) : remain; - } + TDuration timeout = InactiveClientTimeout_ - processTime; + StartInactivityTimer(InactiveClientTimer_, timeout, EWakeupTag::ClientInactivity, ctx); + InactiveClientTimerPending_ = true; + } + + void ProcessServerInactivity(const TActorContext &ctx) { + TInstant now = ctx.Now(); + TDuration timeout = InactiveServerTimeout_; + TDuration processTime = now - LastStatusTimestamp_; + // Ignore server timeout if response buffer is full. + if (LeftInGRpcAdaptorQueue_ == QuotaLimit_) { + // nothing + } else if (processTime >= InactiveServerTimeout_) { + TStringStream ss; + ss << SelfId() + << " Server doesn't provide data for " << processTime + << " which exceeds server timeout " << InactiveServerTimeout_ + << " (QuotaRequestQueue: " << QuotaRequestQueue_.size() + << " ResponseQueue: " << LeftInGRpcAdaptorQueue_ + << " QuotaLimit: " << QuotaLimit_ + << " QuotaReserved: " << QuotaReserved_ + << " QuotaByShard: " << QuotaByShard_.size() << ")"; + LOG_NOTICE_S(ctx, NKikimrServices::READ_TABLE_API, ss.Str()); + return HandleStreamTimeout(ctx, ss.Str()); + } else { + timeout = InactiveServerTimeout_ - processTime; } - if (timeout) - SetTimeoutTimer(timeout, ctx); + StartInactivityTimer(InactiveServerTimer_, timeout, EWakeupTag::ServerInactivity, ctx); + InactiveServerTimerPending_ = true; } void Handle(TEvDataShard::TEvGetReadTableStreamStateRequest::TPtr &ev, const TActorContext &ctx) @@ -466,19 +481,10 @@ private: return ReplyFinishStream(StatusIds::BAD_REQUEST, message, ctx); } - const auto& featureFlags = AppData(ctx)->FeatureFlags; - - bool useSnapshot = featureFlags.GetReadTableWithSnapshot(); + // Snapshots are always enabled and cannot be disabled switch (req->use_snapshot()) { case Ydb::FeatureFlag::STATUS_UNSPECIFIED: - break; - case Ydb::FeatureFlag::ENABLED: - useSnapshot = true; - break; - - case Ydb::FeatureFlag::DISABLED: - useSnapshot = false; break; default: { @@ -492,68 +498,36 @@ private: } } - if (useSnapshot) { - NKikimr::NTxProxy::TReadTableSettings settings; - - if (Request_->GetInternalToken()) { - settings.UserToken = Request_->GetInternalToken(); - } - settings.DatabaseName = CanonizePath(Request_->GetDatabaseName().GetOrElse("")); - - settings.Owner = SelfId(); - settings.TablePath = req->path(); - settings.Ordered = req->ordered(); - settings.RequireResultSet = true; - if (req->row_limit()) { - settings.MaxRows = req->row_limit(); - } - - for (auto &col : req->columns()) { - settings.Columns.push_back(col); - } - - try { - ConvertKeyRange(req->key_range(), [&]{ return &settings.KeyRange; }); - } catch (const std::exception& ex) { - const NYql::TIssue& issue = NYql::ExceptionToIssue(ex); - google::protobuf::RepeatedPtrField<TYdbIssueMessageType> message; - auto item = message.Add(); - NYql::IssueToMessage(issue, item); - return ReplyFinishStream(StatusIds::BAD_REQUEST, message, ctx); - } - - ReadTableActor = ctx.RegisterWithSameMailbox(NKikimr::NTxProxy::CreateReadTableSnapshotWorker(settings)); - } else { - auto proposeRequest = std::make_unique<TEvTxUserProxy::TEvProposeTransaction>(); + NKikimr::NTxProxy::TReadTableSettings settings; - SetAuthToken(proposeRequest, *Request_); - SetDatabase(proposeRequest.get(), *Request_); - NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record; - record.SetExecTimeoutPeriod(TDuration::Minutes(60).MilliSeconds()); - auto readTransaction = record.MutableTransaction()->MutableReadTableTransaction(); - - record.SetStreamResponse(true); - readTransaction->SetPath(req->path()); - readTransaction->SetOrdered(req->ordered()); - readTransaction->SetRowLimit(req->row_limit()); - - readTransaction->SetApiVersion(NKikimrTxUserProxy::TReadTableTransaction::YDB_V1); + if (Request_->GetInternalToken()) { + settings.UserToken = Request_->GetInternalToken(); + } + settings.DatabaseName = CanonizePath(Request_->GetDatabaseName().GetOrElse("")); + + settings.Owner = SelfId(); + settings.TablePath = req->path(); + settings.Ordered = req->ordered(); + settings.RequireResultSet = true; + if (req->row_limit()) { + settings.MaxRows = req->row_limit(); + } - try { - ConvertKeyRange(req->key_range(), [&]{ return readTransaction->MutableKeyRange(); }); - } catch (const std::exception& ex) { - const NYql::TIssue& issue = NYql::ExceptionToIssue(ex); - google::protobuf::RepeatedPtrField<TYdbIssueMessageType> message; - auto item = message.Add(); - NYql::IssueToMessage(issue, item); - return ReplyFinishStream(StatusIds::BAD_REQUEST, message, ctx); - } + for (auto &col : req->columns()) { + settings.Columns.push_back(col); + } - for (auto &col : req->columns()) { - readTransaction->AddColumns(col); - } - ctx.Send(MakeTxProxyID(), proposeRequest.release()); + try { + ConvertKeyRange(req->key_range(), [&]{ return &settings.KeyRange; }); + } catch (const std::exception& ex) { + const NYql::TIssue& issue = NYql::ExceptionToIssue(ex); + google::protobuf::RepeatedPtrField<TYdbIssueMessageType> message; + auto item = message.Add(); + NYql::IssueToMessage(issue, item); + return ReplyFinishStream(StatusIds::BAD_REQUEST, message, ctx); } + + ReadTableActor = ctx.RegisterWithSameMailbox(NKikimr::NTxProxy::CreateReadTableSnapshotWorker(settings)); } void ReplyFinishStream(Ydb::StatusIds::StatusCode status, @@ -598,13 +572,17 @@ private: Die(ctx); } - void SetTimeoutTimer(TDuration timeout, const TActorContext &ctx) { + void StartInactivityTimer(TActorId& timer, TDuration timeout, EWakeupTag tag, const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::READ_TABLE_API, - SelfId() << " Set stream timeout timer for " << timeout); + SelfId() << " Starting inactivity timer for " << timeout << " with tag " << int(tag)); + + if (timer) { + ctx.Send(timer, new TEvents::TEvPoison); + timer = {}; + } - auto *ev = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvWakeup(EWakeupTag::TimeoutTimer)); - StreamTimerCookieHolder_.Reset(ISchedulerCookie::Make2Way()); - CreateLongTimer(ctx, timeout, ev, 0, StreamTimerCookieHolder_.Get()); + auto *ev = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvWakeup(tag)); + timer = CreateLongTimer(timeout, ev); } void HandleStreamTimeout(const TActorContext &ctx, const TString& msg) { @@ -731,6 +709,14 @@ private: Request_->SendSerializedResult(std::move(out), StatusIds::SUCCESS); + if (LeftInGRpcAdaptorQueue_ == 0) { + LastDataStreamTimestamp_ = ctx.Now(); + if (!InactiveClientTimerPending_ && InactiveClientTimeout_) { + StartInactivityTimer(InactiveClientTimer_, InactiveClientTimeout_, EWakeupTag::ClientInactivity, ctx); + InactiveClientTimerPending_ = true; + } + } + LeftInGRpcAdaptorQueue_++; if (LeftInGRpcAdaptorQueue_ > QuotaLimit_) { LOG_ERROR_S(*TlsActivationContext, NKikimrServices::READ_TABLE_API, @@ -763,7 +749,10 @@ private: TInstant LastDataStreamTimestamp_; TInstant LastStatusTimestamp_; - TSchedulerCookieHolder StreamTimerCookieHolder_; + TActorId InactiveClientTimer_; + TActorId InactiveServerTimer_; + bool InactiveClientTimerPending_ = false; + bool InactiveServerTimerPending_ = false; struct TBuffEntry { diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 5e9ef84363..8b8aebb771 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -697,7 +697,7 @@ message TFeatureFlags { optional bool EnableExternalHive = 17 [default = true]; optional bool UseSchemeBoardCacheForSchemeRequests = 18 [default = true]; // deprecated: always true optional bool CompileMinikqlWithVersion = 19 [default = true]; // deprecated: always true - optional bool ReadTableWithSnapshot = 20 [default = true]; + optional bool ReadTableWithSnapshot = 20 [default = true]; // deprecated: always true optional bool ImportantTabletsUseSystemPool = 21 [default = true]; optional bool EnableOfflineSlaves = 22 [default = true]; // deprecated: always true optional bool CheckDatabaseAccessPermission = 23 [default = false]; diff --git a/ydb/core/protos/stream.proto b/ydb/core/protos/stream.proto index 429148ecec..1ec1547ec0 100644 --- a/ydb/core/protos/stream.proto +++ b/ydb/core/protos/stream.proto @@ -11,7 +11,7 @@ message TStreamingConfig { optional uint32 MaxStreamingShards = 5 [default = 5]; // Timeouts used to interrupt inactive streams. optional uint64 InactiveClientTimeout = 6 [default = 60000000]; - optional uint64 InactiveServerTimeout = 7 [default = 60000000]; + optional uint64 InactiveServerTimeout = 7 [default = 0]; } optional bool EnableInputStreams = 1 [default = true]; |