diff options
author | andrewproni <andrewproni@yandex-team.com> | 2023-07-13 15:07:21 +0300 |
---|---|---|
committer | andrewproni <andrewproni@yandex-team.com> | 2023-07-13 15:07:21 +0300 |
commit | 0d3df705d3ae205e6b74f7a207191826123d7cc0 (patch) | |
tree | e75540884efdac0805224f900fe0d7f81682f667 | |
parent | d7e3cf9598d887845a54189ec729c4c852a1edab (diff) | |
download | ydb-0d3df705d3ae205e6b74f7a207191826123d7cc0.tar.gz |
Lazy tables creation
-rw-r--r-- | ydb/core/kqp/common/events/script_executions.h | 13 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 74 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 21 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.h | 5 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_olapstore_ut.cpp | 1 |
5 files changed, 61 insertions, 53 deletions
diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h index 26e876c494..f00d6294a5 100644 --- a/ydb/core/kqp/common/events/script_executions.h +++ b/ydb/core/kqp/common/events/script_executions.h @@ -55,6 +55,13 @@ struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvG { } + TEvGetScriptExecutionOperationResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) + : Ready(false) + , Status(status) + , Issues(std::move(issues)) + { + } + bool Ready; Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; @@ -82,6 +89,12 @@ struct TEvListScriptExecutionOperationsResponse : public NActors::TEventLocal<TE { } + TEvListScriptExecutionOperationsResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) + : Status(status) + , Issues(std::move(issues)) + { + } + Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; TString NextPageToken; diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 48a0f41fac..5b871f4f62 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -233,8 +233,6 @@ public: AskSelfNodeInfo(); SendWhiteboardRequest(); ScheduleIdleSessionCheck(TDuration::Seconds(2)); - - CheckScriptExecutionsTablesExistence(); } TDuration GetSessionIdleDuration() const { @@ -619,18 +617,8 @@ public: } void Handle(TEvKqp::TEvScriptRequest::TPtr& ev) { - if (AppData()->FeatureFlags.GetEnableScriptExecutionOperations()) { - if (ScriptExecutionsCreationStatus == EScriptExecutionsCreationStatus::Pending) { - NYql::TIssues issues; - issues.AddIssue("Not ready"); - Send(ev->Sender, new TEvKqp::TEvScriptResponse(Ydb::StatusIds::UNAVAILABLE, std::move(issues))); - } else { - Register(CreateScriptExecutionCreatorActor(std::move(ev))); - } - } else { - NYql::TIssues issues; - issues.AddIssue("ExecuteScript feature is not enabled"); - Send(ev->Sender, new TEvKqp::TEvScriptResponse(Ydb::StatusIds::UNSUPPORTED, std::move(issues))); + if (CheckScriptExecutionsTablesReady<TEvKqp::TEvScriptResponse>(ev)) { + Register(CreateScriptExecutionCreatorActor(std::move(ev))); } } @@ -1399,32 +1387,64 @@ private: NYql::NDq::SetYqlLogLevels(yqlPriority); } - void CheckScriptExecutionsTablesExistence() { - if (AppData()->FeatureFlags.GetEnableScriptExecutionOperations()) { - Register(CreateScriptExecutionsTablesCreator(MakeHolder<TEvPrivate::TEvScriptExecutionsTablesCreationFinished>())); - } else { - ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Finished; + template<typename TResponse, typename TEvent> + bool CheckScriptExecutionsTablesReady(TEvent& ev) { + if (!AppData()->FeatureFlags.GetEnableScriptExecutionOperations()) { + NYql::TIssues issues; + issues.AddIssue("ExecuteScript feature is not enabled"); + Send(ev->Sender, new TResponse(Ydb::StatusIds::UNSUPPORTED, std::move(issues))); + return false; + } + + switch (ScriptExecutionsCreationStatus) { + case EScriptExecutionsCreationStatus::NotStarted: + ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Pending; + Register(CreateScriptExecutionsTablesCreator(MakeHolder<TEvPrivate::TEvScriptExecutionsTablesCreationFinished>())); + [[fallthrough]]; + case EScriptExecutionsCreationStatus::Pending: + if (DelayedEventsQueue.size() < 10000) { + DelayedEventsQueue.emplace_back(std::move(ev)); + } else { + NYql::TIssues issues; + issues.AddIssue("Too many queued requests"); + Send(ev->Sender, new TResponse(Ydb::StatusIds::OVERLOADED, std::move(issues))); + } + return false; + case EScriptExecutionsCreationStatus::Finished: + return true; } } void Handle(TEvPrivate::TEvScriptExecutionsTablesCreationFinished::TPtr&) { ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Finished; + while (!DelayedEventsQueue.empty()) { + Send(std::move(DelayedEventsQueue.front())); + DelayedEventsQueue.pop_front(); + } } void Handle(NKqp::TEvForgetScriptExecutionOperation::TPtr& ev) { - Register(CreateForgetScriptExecutionOperationActor(std::move(ev))); + if (CheckScriptExecutionsTablesReady<TEvForgetScriptExecutionOperationResponse>(ev)) { + Register(CreateForgetScriptExecutionOperationActor(std::move(ev))); + } } void Handle(NKqp::TEvGetScriptExecutionOperation::TPtr& ev) { - Register(CreateGetScriptExecutionOperationActor(std::move(ev))); + if (CheckScriptExecutionsTablesReady<TEvGetScriptExecutionOperationResponse>(ev)) { + Register(CreateGetScriptExecutionOperationActor(std::move(ev))); + } } - void Handle(NKqp::TEvListScriptExecutionOperations::TPtr& ev) { - Register(CreateListScriptExecutionOperationsActor(std::move(ev))); + void Handle(NKqp::TEvListScriptExecutionOperations::TPtr& ev) { + if (CheckScriptExecutionsTablesReady<TEvListScriptExecutionOperationsResponse>(ev)) { + Register(CreateListScriptExecutionOperationsActor(std::move(ev))); + } } - void Handle(NKqp::TEvCancelScriptExecutionOperation::TPtr& ev) { - Register(CreateCancelScriptExecutionOperationActor(std::move(ev))); + void Handle(NKqp::TEvCancelScriptExecutionOperation::TPtr& ev) { + if (CheckScriptExecutionsTablesReady<TEvCancelScriptExecutionOperationResponse>(ev)) { + Register(CreateCancelScriptExecutionOperationActor(std::move(ev))); + } } void Handle(TEvInterconnect::TEvNodeConnected::TPtr& ev) { @@ -1490,10 +1510,12 @@ private: NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; enum class EScriptExecutionsCreationStatus { + NotStarted, Pending, Finished, }; - EScriptExecutionsCreationStatus ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Pending; + EScriptExecutionsCreationStatus ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::NotStarted; + std::deque<THolder<IEventHandle>> DelayedEventsQueue; }; } // namespace diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 1d1248864f..e6235ee68c 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -443,8 +443,6 @@ void TKikimrRunner::Initialize(const TKikimrSettings& settings) { Client->InitRootScheme(settings.DomainRoot); - NKikimr::NKqp::WaitForKqpProxyInit(GetDriver()); - if (settings.WithSampleTables) { CreateSampleTables(); } @@ -1029,25 +1027,6 @@ void CreateSampleTablesWithIndex(TSession& session, bool populateTables) { UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); } -void WaitForKqpProxyInit(const NYdb::TDriver& driver) { - NYdb::NTable::TTableClient client(driver); - - while (true) { - auto it = client.RetryOperationSync([=](TSession session) { - return session.ExecuteDataQuery(R"( - SELECT 1; - )", - TTxControl::BeginTx().CommitTx() - ).GetValueSync(); - }); - - if (it.IsSuccess()) { - break; - } - Sleep(TDuration::MilliSeconds(100)); - } -} - void InitRoot(Tests::TServer::TPtr server, TActorId sender) { server->SetupRootStoragePools(sender); } diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index 6d627ba351..910cee52d3 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -267,11 +267,6 @@ inline void AssertSuccessResult(const NYdb::TStatus& result) { void CreateSampleTablesWithIndex(NYdb::NTable::TSession& session, bool populateTables = true); -// KQP proxy needs to asynchronously receive tenants info before it is able to serve requests that have -// database name specified. Before that it returns errors. -// This method retries a simple query until it succeeds. -void WaitForKqpProxyInit(const NYdb::TDriver& driver); - void InitRoot(Tests::TServer::TPtr server, TActorId sender); THolder<NKikimr::NSchemeCache::TSchemeCacheNavigate> Navigate(TTestActorRuntime& runtime, const TActorId& sender, diff --git a/ydb/services/ydb/ydb_olapstore_ut.cpp b/ydb/services/ydb/ydb_olapstore_ut.cpp index 4d41288197..053c63102c 100644 --- a/ydb/services/ydb/ydb_olapstore_ut.cpp +++ b/ydb/services/ydb/ydb_olapstore_ut.cpp @@ -58,7 +58,6 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { ui16 grpc = server.GetPort(); TString location = TStringBuilder() << "localhost:" << grpc; auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location).SetDatabase("/Root").SetAuthToken(token)); - NKqp::WaitForKqpProxyInit(connection); return connection; } |