aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorandrewproni <andrewproni@yandex-team.com>2023-07-13 15:07:21 +0300
committerandrewproni <andrewproni@yandex-team.com>2023-07-13 15:07:21 +0300
commit0d3df705d3ae205e6b74f7a207191826123d7cc0 (patch)
treee75540884efdac0805224f900fe0d7f81682f667
parentd7e3cf9598d887845a54189ec729c4c852a1edab (diff)
downloadydb-0d3df705d3ae205e6b74f7a207191826123d7cc0.tar.gz
Lazy tables creation
-rw-r--r--ydb/core/kqp/common/events/script_executions.h13
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp74
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp21
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h5
-rw-r--r--ydb/services/ydb/ydb_olapstore_ut.cpp1
5 files changed, 61 insertions, 53 deletions
diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h
index 26e876c494..f00d6294a5 100644
--- a/ydb/core/kqp/common/events/script_executions.h
+++ b/ydb/core/kqp/common/events/script_executions.h
@@ -55,6 +55,13 @@ struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvG
{
}
+ TEvGetScriptExecutionOperationResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
+ : Ready(false)
+ , Status(status)
+ , Issues(std::move(issues))
+ {
+ }
+
bool Ready;
Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
@@ -82,6 +89,12 @@ struct TEvListScriptExecutionOperationsResponse : public NActors::TEventLocal<TE
{
}
+ TEvListScriptExecutionOperationsResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
+ : Status(status)
+ , Issues(std::move(issues))
+ {
+ }
+
Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
TString NextPageToken;
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 48a0f41fac..5b871f4f62 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -233,8 +233,6 @@ public:
AskSelfNodeInfo();
SendWhiteboardRequest();
ScheduleIdleSessionCheck(TDuration::Seconds(2));
-
- CheckScriptExecutionsTablesExistence();
}
TDuration GetSessionIdleDuration() const {
@@ -619,18 +617,8 @@ public:
}
void Handle(TEvKqp::TEvScriptRequest::TPtr& ev) {
- if (AppData()->FeatureFlags.GetEnableScriptExecutionOperations()) {
- if (ScriptExecutionsCreationStatus == EScriptExecutionsCreationStatus::Pending) {
- NYql::TIssues issues;
- issues.AddIssue("Not ready");
- Send(ev->Sender, new TEvKqp::TEvScriptResponse(Ydb::StatusIds::UNAVAILABLE, std::move(issues)));
- } else {
- Register(CreateScriptExecutionCreatorActor(std::move(ev)));
- }
- } else {
- NYql::TIssues issues;
- issues.AddIssue("ExecuteScript feature is not enabled");
- Send(ev->Sender, new TEvKqp::TEvScriptResponse(Ydb::StatusIds::UNSUPPORTED, std::move(issues)));
+ if (CheckScriptExecutionsTablesReady<TEvKqp::TEvScriptResponse>(ev)) {
+ Register(CreateScriptExecutionCreatorActor(std::move(ev)));
}
}
@@ -1399,32 +1387,64 @@ private:
NYql::NDq::SetYqlLogLevels(yqlPriority);
}
- void CheckScriptExecutionsTablesExistence() {
- if (AppData()->FeatureFlags.GetEnableScriptExecutionOperations()) {
- Register(CreateScriptExecutionsTablesCreator(MakeHolder<TEvPrivate::TEvScriptExecutionsTablesCreationFinished>()));
- } else {
- ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Finished;
+ template<typename TResponse, typename TEvent>
+ bool CheckScriptExecutionsTablesReady(TEvent& ev) {
+ if (!AppData()->FeatureFlags.GetEnableScriptExecutionOperations()) {
+ NYql::TIssues issues;
+ issues.AddIssue("ExecuteScript feature is not enabled");
+ Send(ev->Sender, new TResponse(Ydb::StatusIds::UNSUPPORTED, std::move(issues)));
+ return false;
+ }
+
+ switch (ScriptExecutionsCreationStatus) {
+ case EScriptExecutionsCreationStatus::NotStarted:
+ ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Pending;
+ Register(CreateScriptExecutionsTablesCreator(MakeHolder<TEvPrivate::TEvScriptExecutionsTablesCreationFinished>()));
+ [[fallthrough]];
+ case EScriptExecutionsCreationStatus::Pending:
+ if (DelayedEventsQueue.size() < 10000) {
+ DelayedEventsQueue.emplace_back(std::move(ev));
+ } else {
+ NYql::TIssues issues;
+ issues.AddIssue("Too many queued requests");
+ Send(ev->Sender, new TResponse(Ydb::StatusIds::OVERLOADED, std::move(issues)));
+ }
+ return false;
+ case EScriptExecutionsCreationStatus::Finished:
+ return true;
}
}
void Handle(TEvPrivate::TEvScriptExecutionsTablesCreationFinished::TPtr&) {
ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Finished;
+ while (!DelayedEventsQueue.empty()) {
+ Send(std::move(DelayedEventsQueue.front()));
+ DelayedEventsQueue.pop_front();
+ }
}
void Handle(NKqp::TEvForgetScriptExecutionOperation::TPtr& ev) {
- Register(CreateForgetScriptExecutionOperationActor(std::move(ev)));
+ if (CheckScriptExecutionsTablesReady<TEvForgetScriptExecutionOperationResponse>(ev)) {
+ Register(CreateForgetScriptExecutionOperationActor(std::move(ev)));
+ }
}
void Handle(NKqp::TEvGetScriptExecutionOperation::TPtr& ev) {
- Register(CreateGetScriptExecutionOperationActor(std::move(ev)));
+ if (CheckScriptExecutionsTablesReady<TEvGetScriptExecutionOperationResponse>(ev)) {
+ Register(CreateGetScriptExecutionOperationActor(std::move(ev)));
+ }
}
- void Handle(NKqp::TEvListScriptExecutionOperations::TPtr& ev) {
- Register(CreateListScriptExecutionOperationsActor(std::move(ev)));
+ void Handle(NKqp::TEvListScriptExecutionOperations::TPtr& ev) {
+ if (CheckScriptExecutionsTablesReady<TEvListScriptExecutionOperationsResponse>(ev)) {
+ Register(CreateListScriptExecutionOperationsActor(std::move(ev)));
+ }
}
- void Handle(NKqp::TEvCancelScriptExecutionOperation::TPtr& ev) {
- Register(CreateCancelScriptExecutionOperationActor(std::move(ev)));
+ void Handle(NKqp::TEvCancelScriptExecutionOperation::TPtr& ev) {
+ if (CheckScriptExecutionsTablesReady<TEvCancelScriptExecutionOperationResponse>(ev)) {
+ Register(CreateCancelScriptExecutionOperationActor(std::move(ev)));
+ }
}
void Handle(TEvInterconnect::TEvNodeConnected::TPtr& ev) {
@@ -1490,10 +1510,12 @@ private:
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
enum class EScriptExecutionsCreationStatus {
+ NotStarted,
Pending,
Finished,
};
- EScriptExecutionsCreationStatus ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Pending;
+ EScriptExecutionsCreationStatus ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::NotStarted;
+ std::deque<THolder<IEventHandle>> DelayedEventsQueue;
};
} // namespace
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 1d1248864f..e6235ee68c 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -443,8 +443,6 @@ void TKikimrRunner::Initialize(const TKikimrSettings& settings) {
Client->InitRootScheme(settings.DomainRoot);
- NKikimr::NKqp::WaitForKqpProxyInit(GetDriver());
-
if (settings.WithSampleTables) {
CreateSampleTables();
}
@@ -1029,25 +1027,6 @@ void CreateSampleTablesWithIndex(TSession& session, bool populateTables) {
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
-void WaitForKqpProxyInit(const NYdb::TDriver& driver) {
- NYdb::NTable::TTableClient client(driver);
-
- while (true) {
- auto it = client.RetryOperationSync([=](TSession session) {
- return session.ExecuteDataQuery(R"(
- SELECT 1;
- )",
- TTxControl::BeginTx().CommitTx()
- ).GetValueSync();
- });
-
- if (it.IsSuccess()) {
- break;
- }
- Sleep(TDuration::MilliSeconds(100));
- }
-}
-
void InitRoot(Tests::TServer::TPtr server, TActorId sender) {
server->SetupRootStoragePools(sender);
}
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index 6d627ba351..910cee52d3 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -267,11 +267,6 @@ inline void AssertSuccessResult(const NYdb::TStatus& result) {
void CreateSampleTablesWithIndex(NYdb::NTable::TSession& session, bool populateTables = true);
-// KQP proxy needs to asynchronously receive tenants info before it is able to serve requests that have
-// database name specified. Before that it returns errors.
-// This method retries a simple query until it succeeds.
-void WaitForKqpProxyInit(const NYdb::TDriver& driver);
-
void InitRoot(Tests::TServer::TPtr server, TActorId sender);
THolder<NKikimr::NSchemeCache::TSchemeCacheNavigate> Navigate(TTestActorRuntime& runtime, const TActorId& sender,
diff --git a/ydb/services/ydb/ydb_olapstore_ut.cpp b/ydb/services/ydb/ydb_olapstore_ut.cpp
index 4d41288197..053c63102c 100644
--- a/ydb/services/ydb/ydb_olapstore_ut.cpp
+++ b/ydb/services/ydb/ydb_olapstore_ut.cpp
@@ -58,7 +58,6 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
ui16 grpc = server.GetPort();
TString location = TStringBuilder() << "localhost:" << grpc;
auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location).SetDatabase("/Root").SetAuthToken(token));
- NKqp::WaitForKqpProxyInit(connection);
return connection;
}