aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-02-17 16:43:17 +0300
committersnaury <snaury@ydb.tech>2023-02-17 16:43:17 +0300
commit750a5c2f50390814aa092a0af80f86325d782c4b (patch)
treec1ecd792cfd112362ee5725bff8ce912ab9d4fcc
parent54f53419c080827f9bb5e422844443dd9557e1d8 (diff)
downloadydb-750a5c2f50390814aa092a0af80f86325d782c4b.tar.gz
Remove support for old read table, no server inactivity timer by default
-rw-r--r--ydb/core/grpc_services/rpc_read_table.cpp241
-rw-r--r--ydb/core/protos/config.proto2
-rw-r--r--ydb/core/protos/stream.proto2
3 files changed, 117 insertions, 128 deletions
diff --git a/ydb/core/grpc_services/rpc_read_table.cpp b/ydb/core/grpc_services/rpc_read_table.cpp
index 010b5e7cf3..c74c22499c 100644
--- a/ydb/core/grpc_services/rpc_read_table.cpp
+++ b/ydb/core/grpc_services/rpc_read_table.cpp
@@ -110,9 +110,10 @@ static void ConvertKeyRange(const Ydb::Table::KeyRange& keyRange, const TGetOutp
class TReadTableRPC : public TActorBootstrapped<TReadTableRPC> {
enum EWakeupTag : ui64 {
- TimeoutTimer = 0,
RlSendAllowed = 1,
- RlNoResource = 2
+ RlNoResource = 2,
+ ClientInactivity = 3,
+ ServerInactivity = 4,
};
public:
@@ -136,16 +137,13 @@ public:
LastStatusTimestamp_ = ctx.Now();
LastDataStreamTimestamp_ = ctx.Now();
- InactiveClientTimeout_ = TDuration::FromValue(cfg.GetInactiveClientTimeout());
- InactiveServerTimeout_ = TDuration::FromValue(cfg.GetInactiveServerTimeout());
+ InactiveClientTimeout_ = TDuration::MicroSeconds(cfg.GetInactiveClientTimeout());
+ InactiveServerTimeout_ = TDuration::MicroSeconds(cfg.GetInactiveServerTimeout());
- TDuration timeout;
- if (InactiveClientTimeout_)
- timeout = InactiveClientTimeout_;
- if (InactiveServerTimeout_)
- timeout = timeout ? Min(timeout, InactiveServerTimeout_) : InactiveServerTimeout_;
- if (timeout)
- SetTimeoutTimer(timeout, ctx);
+ if (InactiveServerTimeout_) {
+ StartInactivityTimer(InactiveServerTimer_, InactiveServerTimeout_, EWakeupTag::ServerInactivity, ctx);
+ InactiveServerTimerPending_ = true;
+ }
SendProposeRequest(ctx);
@@ -156,7 +154,6 @@ public:
as->Send(actorId, new TEvents::TEvPoisonPill());
};
Request_->SetClientLostAction(std::move(clientLostCb));
-
}
void PassAway() override {
@@ -164,6 +161,14 @@ public:
Send(ReadTableActor, new TEvents::TEvPoison);
ReadTableActor = { };
}
+ if (InactiveClientTimer_) {
+ Send(InactiveClientTimer_, new TEvents::TEvPoison);
+ InactiveClientTimer_ = { };
+ }
+ if (InactiveServerTimer_) {
+ Send(InactiveServerTimer_, new TEvents::TEvPoison);
+ InactiveServerTimer_ = { };
+ }
TActorBootstrapped::PassAway();
}
@@ -335,12 +340,18 @@ private:
void Handle(TEvents::TEvWakeup::TPtr &ev, const TActorContext &ctx) {
switch ((EWakeupTag) ev->Get()->Tag) {
- case EWakeupTag::TimeoutTimer:
- return ProcessTimeoutTimer(ctx);
case EWakeupTag::RlSendAllowed:
return ProcessRlSendAllowed(ctx);
case EWakeupTag::RlNoResource:
return ProcessRlNoResource(ctx);
+ case EWakeupTag::ClientInactivity:
+ InactiveClientTimer_ = { };
+ InactiveClientTimerPending_ = false;
+ return ProcessClientInactivity(ctx);
+ case EWakeupTag::ServerInactivity:
+ InactiveServerTimer_ = { };
+ InactiveServerTimerPending_ = false;
+ return ProcessServerInactivity(ctx);
}
}
@@ -376,49 +387,53 @@ private:
return ReplyFinishStream(Ydb::StatusIds::OVERLOADED, message, ctx);
}
- void ProcessTimeoutTimer(const TActorContext &ctx) {
+ void ProcessClientInactivity(const TActorContext &ctx) {
+ // We don't consider client inactive when there's nothing for the client to read
+ if (!LeftInGRpcAdaptorQueue_) {
+ return;
+ }
+
TInstant now = ctx.Now();
- TDuration timeout;
-
- if (InactiveClientTimeout_ && LeftInGRpcAdaptorQueue_) {
- TDuration processTime = now - LastDataStreamTimestamp_;
- if (processTime >= InactiveClientTimeout_) {
- TStringStream ss;
- ss << SelfId() << " Client cannot process data in " << processTime
- << " which exceeds client timeout " << InactiveClientTimeout_;
- LOG_NOTICE_S(ctx, NKikimrServices::READ_TABLE_API, ss.Str());
- return HandleStreamTimeout(ctx, ss.Str());
- } else {
- TDuration remain = InactiveClientTimeout_ - processTime;
- timeout = timeout ? Min(timeout, remain) : remain;
- }
+ TDuration processTime = now - LastDataStreamTimestamp_;
+
+ if (processTime >= InactiveClientTimeout_) {
+ TStringStream ss;
+ ss << SelfId() << " Client cannot process data in " << processTime
+ << " which exceeds client timeout " << InactiveClientTimeout_;
+ LOG_NOTICE_S(ctx, NKikimrServices::READ_TABLE_API, ss.Str());
+ return HandleStreamTimeout(ctx, ss.Str());
}
- if (InactiveServerTimeout_) {
- TDuration processTime = now - LastStatusTimestamp_;
- // Ignore server timeout if response buffer is full.
- if (LeftInGRpcAdaptorQueue_ == QuotaLimit_) {
- timeout = timeout ? Min(timeout, InactiveServerTimeout_) : InactiveServerTimeout_;
- } else if (processTime >= InactiveServerTimeout_) {
- TStringStream ss;
- ss << SelfId()
- << " Server doesn't provide data for " << processTime
- << " which exceeds server timeout " << InactiveServerTimeout_
- << " (QuotaRequestQueue: " << QuotaRequestQueue_.size()
- << " ResponseQueue: " << LeftInGRpcAdaptorQueue_
- << " QuotaLimit: " << QuotaLimit_
- << " QuotaReserved: " << QuotaReserved_
- << " QuotaByShard: " << QuotaByShard_.size() << ")";
- LOG_NOTICE_S(ctx, NKikimrServices::READ_TABLE_API, ss.Str());
- return HandleStreamTimeout(ctx, ss.Str());
- } else {
- TDuration remain = InactiveServerTimeout_ - processTime;
- timeout = timeout ? Min(timeout, remain) : remain;
- }
+ TDuration timeout = InactiveClientTimeout_ - processTime;
+ StartInactivityTimer(InactiveClientTimer_, timeout, EWakeupTag::ClientInactivity, ctx);
+ InactiveClientTimerPending_ = true;
+ }
+
+ void ProcessServerInactivity(const TActorContext &ctx) {
+ TInstant now = ctx.Now();
+ TDuration timeout = InactiveServerTimeout_;
+ TDuration processTime = now - LastStatusTimestamp_;
+ // Ignore server timeout if response buffer is full.
+ if (LeftInGRpcAdaptorQueue_ == QuotaLimit_) {
+ // nothing
+ } else if (processTime >= InactiveServerTimeout_) {
+ TStringStream ss;
+ ss << SelfId()
+ << " Server doesn't provide data for " << processTime
+ << " which exceeds server timeout " << InactiveServerTimeout_
+ << " (QuotaRequestQueue: " << QuotaRequestQueue_.size()
+ << " ResponseQueue: " << LeftInGRpcAdaptorQueue_
+ << " QuotaLimit: " << QuotaLimit_
+ << " QuotaReserved: " << QuotaReserved_
+ << " QuotaByShard: " << QuotaByShard_.size() << ")";
+ LOG_NOTICE_S(ctx, NKikimrServices::READ_TABLE_API, ss.Str());
+ return HandleStreamTimeout(ctx, ss.Str());
+ } else {
+ timeout = InactiveServerTimeout_ - processTime;
}
- if (timeout)
- SetTimeoutTimer(timeout, ctx);
+ StartInactivityTimer(InactiveServerTimer_, timeout, EWakeupTag::ServerInactivity, ctx);
+ InactiveServerTimerPending_ = true;
}
void Handle(TEvDataShard::TEvGetReadTableStreamStateRequest::TPtr &ev, const TActorContext &ctx)
@@ -466,19 +481,10 @@ private:
return ReplyFinishStream(StatusIds::BAD_REQUEST, message, ctx);
}
- const auto& featureFlags = AppData(ctx)->FeatureFlags;
-
- bool useSnapshot = featureFlags.GetReadTableWithSnapshot();
+ // Snapshots are always enabled and cannot be disabled
switch (req->use_snapshot()) {
case Ydb::FeatureFlag::STATUS_UNSPECIFIED:
- break;
-
case Ydb::FeatureFlag::ENABLED:
- useSnapshot = true;
- break;
-
- case Ydb::FeatureFlag::DISABLED:
- useSnapshot = false;
break;
default: {
@@ -492,68 +498,36 @@ private:
}
}
- if (useSnapshot) {
- NKikimr::NTxProxy::TReadTableSettings settings;
-
- if (Request_->GetInternalToken()) {
- settings.UserToken = Request_->GetInternalToken();
- }
- settings.DatabaseName = CanonizePath(Request_->GetDatabaseName().GetOrElse(""));
-
- settings.Owner = SelfId();
- settings.TablePath = req->path();
- settings.Ordered = req->ordered();
- settings.RequireResultSet = true;
- if (req->row_limit()) {
- settings.MaxRows = req->row_limit();
- }
-
- for (auto &col : req->columns()) {
- settings.Columns.push_back(col);
- }
-
- try {
- ConvertKeyRange(req->key_range(), [&]{ return &settings.KeyRange; });
- } catch (const std::exception& ex) {
- const NYql::TIssue& issue = NYql::ExceptionToIssue(ex);
- google::protobuf::RepeatedPtrField<TYdbIssueMessageType> message;
- auto item = message.Add();
- NYql::IssueToMessage(issue, item);
- return ReplyFinishStream(StatusIds::BAD_REQUEST, message, ctx);
- }
-
- ReadTableActor = ctx.RegisterWithSameMailbox(NKikimr::NTxProxy::CreateReadTableSnapshotWorker(settings));
- } else {
- auto proposeRequest = std::make_unique<TEvTxUserProxy::TEvProposeTransaction>();
+ NKikimr::NTxProxy::TReadTableSettings settings;
- SetAuthToken(proposeRequest, *Request_);
- SetDatabase(proposeRequest.get(), *Request_);
- NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record;
- record.SetExecTimeoutPeriod(TDuration::Minutes(60).MilliSeconds());
- auto readTransaction = record.MutableTransaction()->MutableReadTableTransaction();
-
- record.SetStreamResponse(true);
- readTransaction->SetPath(req->path());
- readTransaction->SetOrdered(req->ordered());
- readTransaction->SetRowLimit(req->row_limit());
-
- readTransaction->SetApiVersion(NKikimrTxUserProxy::TReadTableTransaction::YDB_V1);
+ if (Request_->GetInternalToken()) {
+ settings.UserToken = Request_->GetInternalToken();
+ }
+ settings.DatabaseName = CanonizePath(Request_->GetDatabaseName().GetOrElse(""));
+
+ settings.Owner = SelfId();
+ settings.TablePath = req->path();
+ settings.Ordered = req->ordered();
+ settings.RequireResultSet = true;
+ if (req->row_limit()) {
+ settings.MaxRows = req->row_limit();
+ }
- try {
- ConvertKeyRange(req->key_range(), [&]{ return readTransaction->MutableKeyRange(); });
- } catch (const std::exception& ex) {
- const NYql::TIssue& issue = NYql::ExceptionToIssue(ex);
- google::protobuf::RepeatedPtrField<TYdbIssueMessageType> message;
- auto item = message.Add();
- NYql::IssueToMessage(issue, item);
- return ReplyFinishStream(StatusIds::BAD_REQUEST, message, ctx);
- }
+ for (auto &col : req->columns()) {
+ settings.Columns.push_back(col);
+ }
- for (auto &col : req->columns()) {
- readTransaction->AddColumns(col);
- }
- ctx.Send(MakeTxProxyID(), proposeRequest.release());
+ try {
+ ConvertKeyRange(req->key_range(), [&]{ return &settings.KeyRange; });
+ } catch (const std::exception& ex) {
+ const NYql::TIssue& issue = NYql::ExceptionToIssue(ex);
+ google::protobuf::RepeatedPtrField<TYdbIssueMessageType> message;
+ auto item = message.Add();
+ NYql::IssueToMessage(issue, item);
+ return ReplyFinishStream(StatusIds::BAD_REQUEST, message, ctx);
}
+
+ ReadTableActor = ctx.RegisterWithSameMailbox(NKikimr::NTxProxy::CreateReadTableSnapshotWorker(settings));
}
void ReplyFinishStream(Ydb::StatusIds::StatusCode status,
@@ -598,13 +572,17 @@ private:
Die(ctx);
}
- void SetTimeoutTimer(TDuration timeout, const TActorContext &ctx) {
+ void StartInactivityTimer(TActorId& timer, TDuration timeout, EWakeupTag tag, const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::READ_TABLE_API,
- SelfId() << " Set stream timeout timer for " << timeout);
+ SelfId() << " Starting inactivity timer for " << timeout << " with tag " << int(tag));
+
+ if (timer) {
+ ctx.Send(timer, new TEvents::TEvPoison);
+ timer = {};
+ }
- auto *ev = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvWakeup(EWakeupTag::TimeoutTimer));
- StreamTimerCookieHolder_.Reset(ISchedulerCookie::Make2Way());
- CreateLongTimer(ctx, timeout, ev, 0, StreamTimerCookieHolder_.Get());
+ auto *ev = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvWakeup(tag));
+ timer = CreateLongTimer(timeout, ev);
}
void HandleStreamTimeout(const TActorContext &ctx, const TString& msg) {
@@ -731,6 +709,14 @@ private:
Request_->SendSerializedResult(std::move(out), StatusIds::SUCCESS);
+ if (LeftInGRpcAdaptorQueue_ == 0) {
+ LastDataStreamTimestamp_ = ctx.Now();
+ if (!InactiveClientTimerPending_ && InactiveClientTimeout_) {
+ StartInactivityTimer(InactiveClientTimer_, InactiveClientTimeout_, EWakeupTag::ClientInactivity, ctx);
+ InactiveClientTimerPending_ = true;
+ }
+ }
+
LeftInGRpcAdaptorQueue_++;
if (LeftInGRpcAdaptorQueue_ > QuotaLimit_) {
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::READ_TABLE_API,
@@ -763,7 +749,10 @@ private:
TInstant LastDataStreamTimestamp_;
TInstant LastStatusTimestamp_;
- TSchedulerCookieHolder StreamTimerCookieHolder_;
+ TActorId InactiveClientTimer_;
+ TActorId InactiveServerTimer_;
+ bool InactiveClientTimerPending_ = false;
+ bool InactiveServerTimerPending_ = false;
struct TBuffEntry
{
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 5e9ef84363..8b8aebb771 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -697,7 +697,7 @@ message TFeatureFlags {
optional bool EnableExternalHive = 17 [default = true];
optional bool UseSchemeBoardCacheForSchemeRequests = 18 [default = true]; // deprecated: always true
optional bool CompileMinikqlWithVersion = 19 [default = true]; // deprecated: always true
- optional bool ReadTableWithSnapshot = 20 [default = true];
+ optional bool ReadTableWithSnapshot = 20 [default = true]; // deprecated: always true
optional bool ImportantTabletsUseSystemPool = 21 [default = true];
optional bool EnableOfflineSlaves = 22 [default = true]; // deprecated: always true
optional bool CheckDatabaseAccessPermission = 23 [default = false];
diff --git a/ydb/core/protos/stream.proto b/ydb/core/protos/stream.proto
index 429148ecec..1ec1547ec0 100644
--- a/ydb/core/protos/stream.proto
+++ b/ydb/core/protos/stream.proto
@@ -11,7 +11,7 @@ message TStreamingConfig {
optional uint32 MaxStreamingShards = 5 [default = 5];
// Timeouts used to interrupt inactive streams.
optional uint64 InactiveClientTimeout = 6 [default = 60000000];
- optional uint64 InactiveServerTimeout = 7 [default = 60000000];
+ optional uint64 InactiveServerTimeout = 7 [default = 0];
}
optional bool EnableInputStreams = 1 [default = true];