diff options
author | komels <komels@yandex-team.ru> | 2022-06-06 16:47:12 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-06-06 16:47:12 +0300 |
commit | 00ae6ca0fc95a0f7710c47631e0cbe6f788589ac (patch) | |
tree | 568d7edee31c68e2c56b37083414dd4abd7fb9e8 | |
parent | 4b2bcca39c8932b48853d9107f7909516db2564a (diff) | |
download | ydb-00ae6ca0fc95a0f7710c47631e0cbe6f788589ac.tar.gz |
SQS-689
YC Search events from YMQ are now processed via KQP proxy, not external driver. Enables switching to SSL-port in YDB configs for YMQ (currently we use Grpc-port)
REVIEW: 2559837
REVIEW: 2609000
x-ydb-stable-ref: d3c9084f6b0c7a7cde35297b2380177541ecc92e
-rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.cpp | 5 | ||||
-rw-r--r-- | ydb/core/ymq/actor/index_events_processor.cpp | 415 | ||||
-rw-r--r-- | ydb/core/ymq/actor/index_events_processor.h | 97 | ||||
-rw-r--r-- | ydb/core/ymq/actor/service.cpp | 25 | ||||
-rw-r--r-- | ydb/core/ymq/actor/service.h | 5 | ||||
-rw-r--r-- | ydb/core/ymq/actor/serviceid.h | 2 | ||||
-rw-r--r-- | ydb/core/ymq/actor/ya.make | 1 | ||||
-rw-r--r-- | ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp | 6 |
8 files changed, 203 insertions, 353 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index eac22262db..6d1d24f1d9 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -2094,11 +2094,8 @@ TSqsServiceInitializer::TSqsServiceInitializer(const TKikimrRunConfig& runConfig void TSqsServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { if (Config.GetSqsConfig().GetEnableSqs()) { - ui32 grpcPort = 0; - if (Config.HasGRpcConfig()) - grpcPort = Config.GetGRpcConfig().GetPort(); { - IActor* actor = NSQS::CreateSqsService(grpcPort); + IActor* actor = NSQS::CreateSqsService(); setup->LocalServices.emplace_back( NSQS::MakeSqsServiceID(NodeId), TActorSetupCmd(actor, TMailboxType::HTSwap, appData->UserPoolId)); diff --git a/ydb/core/ymq/actor/index_events_processor.cpp b/ydb/core/ymq/actor/index_events_processor.cpp index 646e06420e..e1fd24587e 100644 --- a/ydb/core/ymq/actor/index_events_processor.cpp +++ b/ydb/core/ymq/actor/index_events_processor.cpp @@ -4,22 +4,18 @@ namespace NKikimr::NSQS { using namespace NActors; using namespace NJson; -using namespace NYdb; +using namespace NKikimr; constexpr TDuration DEFAULT_RETRY_TIMEOUT = TDuration::Seconds(10); -constexpr TDuration SHORT_RETRY_TIMEOUT = TDuration::Seconds(2); -constexpr TDuration DEFAULT_QUERY_TIMEOUT = TDuration::Seconds(30); TSearchEventsProcessor::TSearchEventsProcessor( const TString& root, const TDuration& reindexInterval, const TDuration& rescanInterval, - const TSimpleSharedPtr<NTable::TTableClient>& tableClient, - IEventsWriterWrapper::TPtr eventsWriter, - bool waitForWake + const TString& database, IEventsWriterWrapper::TPtr eventsWriter, bool waitForWake ) : RescanInterval(rescanInterval) , ReindexInterval(reindexInterval) - , TableClient(tableClient) , EventsWriter(eventsWriter) + , Database(database) , WaitForWake(waitForWake) { InitQueries(root); @@ -61,169 +57,146 @@ TSearchEventsProcessor::~TSearchEventsProcessor() { void TSearchEventsProcessor::Bootstrap(const TActorContext& ctx) { Become(&TSearchEventsProcessor::StateFunc); + State = EState::QueuesListingExecute; if (!WaitForWake) - ctx.Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup(StartQueuesListingTag)); + ctx.Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup()); } -STFUNC(TSearchEventsProcessor::StateFunc) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvWakeup, HandleWakeup); +void TSearchEventsProcessor::HandleWakeup(TEvWakeup::TPtr&, const TActorContext& ctx) { + switch (State) { + case EState::Stopping: + return; + case EState::QueuesListingExecute: + return StartQueuesListing(ctx); + case EState::CleanupExecute: + return RunEventsCleanup(ctx); default: Y_FAIL(); - }; + } } -void TSearchEventsProcessor::HandleWakeup(TEvWakeup::TPtr& ev, const TActorContext& ctx) { - if (Stopping) +void TSearchEventsProcessor::HandleQueryResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->Record.GetRef(); + if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + LOG_ERROR_S(ctx, NKikimrServices::SQS, + "YC Search events processor: Got error trying to perform request: " << record); + HandleFailure(ctx); return; - switch (ev->Get()->Tag) { - case StartQueuesListingTag: - StartQueuesListing(ctx); - break; - case QueuesListSessionStartedTag: - OnQueuesListSessionReady(ctx); - break; - case QueuesListPreparedTag: - OnQueuesListPrepared(ctx); - break; - case QueuesListQueryCompleteTag: - OnQueuesListQueryComplete(ctx); - break; - case RunEventsListingTag: - RunEventsListing(ctx); - break; - case EventsListingDoneTag: - OnEventsListingDone(ctx); - break; - case StartCleanupTag: - StartCleanupSession(ctx); - break; - case CleanupSessionReadyTag: - OnCleanupSessionReady(ctx); - break; - case CleanupTxReadyTag: - OnCleanupTxReady(ctx); - break; - case CleanupPreparedTag: - OnCleanupPrepared(ctx); - break; - case CleanupQueryCompleteTag: - OnCleanupQueryComplete(ctx); - break; - case CleanupTxCommittedTag: - OnCleanupTxCommitted(ctx); - break; - case StopAllTag: - Stopping = true; - EventsWriter->Close(); - break; + } + switch (State) { + case EState::QueuesListingExecute: + return OnQueuesListQueryComplete(ev, ctx); + case EState::EventsListingExecute: + return OnEventsListingDone(ev, ctx); + case EState::CleanupExecute: + return OnCleanupQueryComplete(ctx); + case EState::Stopping: + return StopSession(ctx); default: Y_FAIL(); } } -void TSearchEventsProcessor::StartQueuesListing(const TActorContext& ctx) { - LastQueuesKey = {}; - StartSession(QueuesListSessionStartedTag, ctx); +void TSearchEventsProcessor::HandleProcessResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->Record; + LOG_ERROR_S(ctx, NKikimrServices::SQS, "YC Search events processor: failed to list ymq events: " << record); + HandleFailure(ctx); } -void TSearchEventsProcessor::OnQueuesListSessionReady(const TActorContext &ctx) { - if (!SessionStarted(ctx)) { - return ctx.Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup(StartQueuesListingTag)); +void TSearchEventsProcessor::HandleFailure(const TActorContext& ctx) { + StopSession(ctx); + switch (State) { + case EState::EventsListingExecute: + State = EState::QueuesListingExecute; + case EState::QueuesListingExecute: + case EState::CleanupExecute: + return ctx.Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup()); + case EState::Stopping: + return; + default: + Y_FAIL(); } - PrepareDataQuery(SelectQueuesQuery, QueuesListPreparedTag, ctx); } -void TSearchEventsProcessor::OnQueuesListPrepared(const TActorContext &ctx) { - if (!QueryPrepared(ctx)) { - return ctx.Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup(StartQueuesListingTag)); - } - RunQueuesListQuery(ctx, true); +void TSearchEventsProcessor::StartQueuesListing(const TActorContext& ctx) { + LastQueuesKey = {}; + StopSession(ctx); + ExistingQueues.clear(); + RunQueuesListQuery(ctx); } -void TSearchEventsProcessor::RunQueuesListQuery(const TActorContext& ctx, bool initial) { - if (initial) { - ExistingQueues.clear(); - } - Y_VERIFY(Session.Defined()); - Y_VERIFY(PreparedQuery.Defined()); +void TSearchEventsProcessor::RunQueuesListQuery(const TActorContext& ctx) { + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + auto* request = ev->Record.MutableRequest(); - auto builder = PreparedQuery.Get()->GetParamsBuilder(); - { - auto ¶m = builder.AddParam("$Account"); - param.Utf8(LastQueuesKey.Account); - param.Build(); - } - { - auto ¶m = builder.AddParam("$QueueName"); - param.Utf8(LastQueuesKey.QueueName); - param.Build(); - } - RunPrepared(builder.Build(), QueuesListQueryCompleteTag, ctx); + request->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + request->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); + request->SetKeepSession(false); + request->SetPreparedQuery(SelectQueuesQuery); + + NClient::TParameters params; + params["$Account"] = LastQueuesKey.Account; + params["$QueueName"] = LastQueuesKey.QueueName; + + RunQuery(SelectQueuesQuery, ¶ms, true, ctx); } -void TSearchEventsProcessor::OnQueuesListQueryComplete(const TActorContext& ctx) { - if (!QueryComplete(ctx)) { - return ctx.Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup(StartQueuesListingTag)); - } +void TSearchEventsProcessor::OnQueuesListQueryComplete(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, + const TActorContext& ctx) { - const auto& result = QueryResult.Get()->GetResultSet(0); + auto& response = ev->Get()->Record.GetRef().GetResponse(); - TResultSetParser parser(result); - while (parser.TryNextRow()) { - TString queueName = *parser.ColumnParser("QueueName").GetOptionalUtf8(); - TString cloudId = *parser.ColumnParser("Account").GetOptionalUtf8(); - TString customName = *parser.ColumnParser("CustomQueueName").GetOptionalUtf8(); - ui64 createTs = *parser.ColumnParser("CreatedTimestamp").GetOptionalUint64(); - TString folderId = *parser.ColumnParser("FolderId").GetOptionalUtf8(); + Y_VERIFY(response.GetResults().size() == 1); + TString queueName, cloudId; + const auto& rr = response.GetResults(0).GetValue().GetStruct(0); + // "SELECT Account, QueueName, CustomQueueName, CreatedTimestamp, FolderId" + for (const auto& row : rr.GetList()) { + cloudId = row.GetStruct(0).GetOptional().GetText(); + queueName = row.GetStruct(1).GetOptional().GetText(); + auto customName = row.GetStruct(2).GetOptional().GetText(); + auto createTs = row.GetStruct(3).GetOptional().GetUint64(); + auto folderId = row.GetStruct(4).GetOptional().GetText(); auto insResult = ExistingQueues.insert(std::make_pair( queueName, TQueueEvent{EQueueEventType::Existed, createTs, customName, cloudId, folderId} )); Y_VERIFY(insResult.second); - LastQueuesKey.QueueName = queueName; - LastQueuesKey.Account = cloudId; } - if (result.Truncated()) { - RunQueuesListQuery(ctx, false); + if (SessionId.empty()) { + SessionId = response.GetSessionId(); + } else { + Y_VERIFY(SessionId == response.GetSessionId()); + } + + LastQueuesKey.QueueName = queueName; + LastQueuesKey.Account = cloudId; + + if (rr.ListSize() > 0) { + RunQueuesListQuery(ctx); } else { - Send(ctx.SelfID, new TEvWakeup(RunEventsListingTag)); + StopSession(ctx); + State = EState::EventsListingExecute; + RunEventsListing(ctx); } } void TSearchEventsProcessor::RunEventsListing(const TActorContext& ctx) { - QueryResult = Nothing(); - Status = TableClient->RetryOperation<NTable::TDataQueryResult>([this, query = SelectEventsQuery](NTable::TSession session) { - auto tx = NTable::TTxControl::BeginTx().CommitTx(); - return session.ExecuteDataQuery( - query, tx, NTable::TExecDataQuerySettings().ClientTimeout(DEFAULT_QUERY_TIMEOUT) - ).Apply([this](const auto& future) mutable { - QueryResult = future.GetValue(); - return future; - }); - }); - Apply(&Status, EventsListingDoneTag, ctx); + RunQuery(SelectEventsQuery, nullptr, true, ctx); } -void TSearchEventsProcessor::OnEventsListingDone(const TActorContext& ctx) { - auto& status = Status.GetValue(); - if (!status.IsSuccess()) { - Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup(RunEventsListingTag)); - return; - } - - Y_VERIFY(QueryResult.Defined()); - Y_VERIFY(QueryResult.Get()->GetResultSets().size() == 1); - const auto& result = QueryResult.Get()->GetResultSet(0); - TResultSetParser parser(result); +void TSearchEventsProcessor::OnEventsListingDone(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { QueuesEvents.clear(); - - while (parser.TryNextRow()) { - TString cloudId = *parser.ColumnParser("Account").GetOptionalUtf8(); - TString queueName = *parser.ColumnParser("QueueName").GetOptionalUtf8(); - ui64 evType = *parser.ColumnParser("EventType").GetOptionalUint64(); - TString customName = *parser.ColumnParser("CustomQueueName").GetOptionalUtf8(); - TString folderId = *parser.ColumnParser("FolderId").GetOptionalUtf8(); - ui64 timestamp = *parser.ColumnParser("EventTimestamp").GetOptionalUint64(); + const auto& record = ev->Get()->Record.GetRef(); + Y_VERIFY(record.GetResponse().GetResults().size() == 1); + const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0); + + for (const auto& row : rr.GetList()) { + // "SELECT Account, QueueName, EventType, CustomQueueName, EventTimestamp, FolderId + auto cloudId = row.GetStruct(0).GetOptional().GetText(); + auto queueName = row.GetStruct(1).GetOptional().GetText(); + auto evType = row.GetStruct(2).GetOptional().GetUint64(); + auto customName = row.GetStruct(3).GetOptional().GetText(); + auto timestamp = row.GetStruct(4).GetOptional().GetUint64(); + auto folderId = row.GetStruct(5).GetOptional().GetText(); auto& qEvents = QueuesEvents[queueName]; auto insResult = qEvents.insert(std::make_pair( timestamp, TQueueEvent{EQueueEventType(evType), timestamp, customName, cloudId, folderId} @@ -233,68 +206,28 @@ void TSearchEventsProcessor::OnEventsListingDone(const TActorContext& ctx) { ProcessEventsQueue(ctx); } -void TSearchEventsProcessor::StartCleanupSession(const TActorContext& ctx) { - StartSession(CleanupSessionReadyTag, ctx); -} - -void TSearchEventsProcessor::OnCleanupSessionReady(const TActorContext &ctx) { - if (!SessionStarted(ctx)) { - return ctx.Schedule(SHORT_RETRY_TIMEOUT, new TEvWakeup(StartCleanupTag)); - } - StartTx(CleanupTxReadyTag, ctx); -} - -void TSearchEventsProcessor::OnCleanupTxReady(const TActorContext &ctx) { - if (!TxStarted(ctx)) { - return ctx.Schedule(SHORT_RETRY_TIMEOUT, new TEvWakeup(StartCleanupTag)); - } - PrepareDataQuery(DeleteEventQuery, CleanupPreparedTag, ctx); -} - -void TSearchEventsProcessor::OnCleanupPrepared(const TActorContext &ctx) { - if (!QueryPrepared(ctx)) { - return ctx.Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup(StartCleanupTag)); - } - RunEventsCleanup(ctx); -} - void TSearchEventsProcessor::RunEventsCleanup(const TActorContext& ctx) { - Y_VERIFY(PreparedQuery.Defined()); - Y_VERIFY(CurrentTx.Defined()); + State = EState::CleanupExecute; - auto builder = PreparedQuery.Get()->GetParamsBuilder(); - auto ¶m = builder.AddParam("$Events"); - param.BeginList(); + NClient::TParameters params; + auto param = params["$Events"]; for (const auto&[qName, events] : QueuesEvents) { for (const auto&[_, event]: events) { - param.AddListItem().BeginStruct().AddMember("Account").Utf8(event.CloudId) - .AddMember("QueueName").Utf8(qName) - .AddMember("EventType").Uint64(static_cast<ui64>(event.Type)) - .EndStruct(); - + auto item = param.AddListItem(); + item["Account"] = event.CloudId; + item["QueueName"] = qName; + item["EventType"] = static_cast<ui64>(event.Type); } } - param.EndList().Build(); - RunPrepared(builder.Build(), CleanupQueryCompleteTag, ctx); + RunQuery(DeleteEventQuery, ¶ms, false, ctx); } void TSearchEventsProcessor::OnCleanupQueryComplete(const TActorContext& ctx) { - if (!QueryComplete(ctx)) { - return ctx.Schedule(SHORT_RETRY_TIMEOUT, new TEvWakeup(StartCleanupTag)); - } - CommitTx(CleanupTxCommittedTag, ctx); -} - -void TSearchEventsProcessor::OnCleanupTxCommitted(const TActorContext &ctx) { - if (!TxCommitted(ctx)) { - return ctx.Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup(StartCleanupTag)); - } QueuesEvents.clear(); ProcessReindexIfRequired(ctx); } void TSearchEventsProcessor::ProcessEventsQueue(const TActorContext& ctx) { - for (const auto& [qName, events] : QueuesEvents) { for (const auto& [ts, event]: events) { auto existsIter = ExistingQueues.find(qName); @@ -315,7 +248,8 @@ void TSearchEventsProcessor::ProcessEventsQueue(const TActorContext& ctx) { } } if (!QueuesEvents.empty()) { - Send(ctx.SelfID, new TEvWakeup(StartCleanupTag)); + State = EState::CleanupExecute; + RunEventsCleanup(ctx); } else { ProcessReindexIfRequired(ctx); } @@ -346,7 +280,8 @@ void TSearchEventsProcessor::ProcessReindexResult(const TActorContext &ctx) { } void TSearchEventsProcessor::WaitNextCycle(const TActorContext &ctx) { - ctx.Schedule(RescanInterval, new TEvWakeup(StartQueuesListingTag)); + State = EState::QueuesListingExecute; + ctx.Schedule(RescanInterval, new TEvWakeup()); } ui64 TSearchEventsProcessor::GetReindexCount() const { @@ -357,99 +292,47 @@ NActors::TActorSystem* TSearchEventsProcessor::GetActorSystem() { return TActivationContext::ActorSystem(); } -void TSearchEventsProcessor::StartSession(ui32 evTag, const TActorContext &ctx) { - Session = Nothing(); - CurrentTx = Nothing(); - PreparedQuery = Nothing(); - QueryResult = Nothing(); - SessionFuture = TableClient->GetSession(); - Apply(&SessionFuture, evTag, ctx); -} - -void TSearchEventsProcessor::PrepareDataQuery(const TString& query, ui32 evTag, const TActorContext& ctx) { - Y_VERIFY(Session.Defined()); - PreparedQuery = Nothing(); - QueryResult = Nothing(); - PrepQueryFuture = Session.Get()->PrepareDataQuery(query); - Apply(&PrepQueryFuture, evTag, ctx); -} - -void TSearchEventsProcessor::StartTx(ui32 evTag, const TActorContext &ctx) { - Y_VERIFY(Session.Defined()); - CurrentTx = Nothing(); - TxFuture = Session.Get()->BeginTransaction(); - Apply(&TxFuture, evTag, ctx); +void TSearchEventsProcessor::StopSession(const TActorContext& ctx) { + if (!SessionId.empty()) { + auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>(); + ev->Record.MutableRequest()->SetSessionId(SessionId); + Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); + SessionId = TString(); + } } -void TSearchEventsProcessor::CommitTx(ui32 evTag, const TActorContext &ctx) { - Y_VERIFY(Session.Defined()); - Y_VERIFY(CurrentTx.Defined()); - CommitTxFuture = CurrentTx.Get()->Commit(); - Apply(&CommitTxFuture, evTag, ctx); -} +void TSearchEventsProcessor::RunQuery(const TString& query, NKikimr::NClient::TParameters* params, bool readonly, + const TActorContext& ctx) { + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + auto* request = ev->Record.MutableRequest(); -void TSearchEventsProcessor::RunPrepared(NYdb::TParams&& params, ui32 evTag, const TActorContext& ctx) { - Y_VERIFY(Session.Defined()); - Y_VERIFY(PreparedQuery.Defined()); - QueryResult = Nothing(); - NTable::TTxControl txControl = CurrentTx.Defined() ? NTable::TTxControl::Tx(*CurrentTx) - : NTable::TTxControl::BeginTx().CommitTx(); - QueryResultFuture = PreparedQuery.Get()->Execute(txControl, std::forward<NYdb::TParams>(params)); - Apply(&QueryResultFuture, evTag, ctx); -} + request->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + request->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); + request->SetKeepSession(State == EState::QueuesListingExecute); + request->SetQuery(query); -bool TSearchEventsProcessor::SessionStarted(const TActorContext&) { - Y_VERIFY(SessionFuture.HasValue()); - auto& value = SessionFuture.GetValueSync(); - if (!value.IsSuccess()) - return false; - Session = value.GetSession(); - return true; -} + if (!SessionId.empty()) { + request->SetSessionId(SessionId); + } + if (!Database.empty()) + request->SetDatabase(Database); -bool TSearchEventsProcessor::TxStarted(const TActorContext&) { - Y_VERIFY(Session.Defined()); - Y_VERIFY(TxFuture.HasValue()); - auto& value = TxFuture.GetValueSync(); - if (!value.IsSuccess()) - return false; - CurrentTx = value.GetTransaction(); - return true; -} -bool TSearchEventsProcessor::TxCommitted(const TActorContext&) { - Y_VERIFY(Session.Defined()); - Y_VERIFY(CurrentTx.Defined()); - Y_VERIFY(CommitTxFuture.HasValue()); - auto& value = CommitTxFuture.GetValueSync(); - if (!value.IsSuccess()) - return false; - CurrentTx = Nothing(); - return true; -} + request->MutableQueryCachePolicy()->set_keep_in_cache(true); -bool TSearchEventsProcessor::QueryPrepared(const TActorContext&) { - Y_VERIFY(Session.Defined()); - Y_VERIFY(PrepQueryFuture.HasValue()); - auto& value = PrepQueryFuture.GetValueSync(); - if (!value.IsSuccess()) { - return false; + if (readonly) { + request->MutableTxControl()->mutable_begin_tx()->mutable_stale_read_only(); + } else { + request->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); } - PreparedQuery = value.GetQuery(); - return true; -} -bool TSearchEventsProcessor::QueryComplete(const TActorContext&) { - Y_VERIFY(Session.Defined()); - Y_VERIFY(QueryResultFuture.HasValue()); - QueryResult = QueryResultFuture.GetValueSync(); - if (!QueryResult.Get()->IsSuccess()) { - return false; + request->MutableTxControl()->set_commit_tx(true); + if (params != nullptr) { + request->MutableParameters()->Swap(params); } - return true; -} + Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); +} void TSearchEventsProcessor::SaveQueueEvent( - const TString& queueName, const TQueueEvent& event, const TActorContext& ctx ) { auto tsIsoString = TInstant::MilliSeconds(event.Timestamp).ToIsoStringLocal(); @@ -483,6 +366,10 @@ void TSearchEventsProcessor::SaveQueueEvent( SendJson(ss.Str(), ctx); } +void TSearchEventsProcessor::HandlePoisonPill(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) { + Die(ctx); +} + void IEventsWriterWrapper::Close() { if (!Closed) { Closed = true; diff --git a/ydb/core/ymq/actor/index_events_processor.h b/ydb/core/ymq/actor/index_events_processor.h index a8ae462064..fd75e6dded 100644 --- a/ydb/core/ymq/actor/index_events_processor.h +++ b/ydb/core/ymq/actor/index_events_processor.h @@ -2,6 +2,9 @@ #include "events.h" #include <ydb/core/ymq/base/events_writer_iface.h> +#include <ydb/core/kqp/kqp.h> +#include <ydb/public/lib/deprecated/kicli/kicli.h> + #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> @@ -21,7 +24,7 @@ friend class TIndexProcesorTests; public: TSearchEventsProcessor(const TString& root, const TDuration& reindexInterval, const TDuration& rescanInterval, - const TSimpleSharedPtr<NYdb::NTable::TTableClient>& tableClient, + const TString& database, IEventsWriterWrapper::TPtr eventsWriter, bool waitForWake = false); @@ -36,23 +39,14 @@ private: TDuration RescanInterval; TDuration ReindexInterval; TInstant LastReindex = TInstant::Zero(); - TSimpleSharedPtr<NYdb::NTable::TTableClient> TableClient; IEventsWriterWrapper::TPtr EventsWriter; - NYdb::TAsyncStatus Status; - NYdb::NTable::TAsyncCreateSessionResult SessionFuture; - NYdb::NTable::TAsyncBeginTransactionResult TxFuture; - NYdb::NTable::TAsyncCommitTransactionResult CommitTxFuture; - NYdb::NTable::TAsyncPrepareQueryResult PrepQueryFuture; - NYdb::NTable::TAsyncDataQueryResult QueryResultFuture; + TString Database; + + TString SessionId; - TMaybe<NYdb::NTable::TSession> Session; - TMaybe<NYdb::NTable::TDataQuery> PreparedQuery; - TMaybe<NYdb::NTable::TTransaction> CurrentTx; - TMaybe<NYdb::NTable::TDataQueryResult> QueryResult; bool WaitForWake = false; - bool Stopping = false; enum class EQueueEventType { Deleted = 0, Created = 1, @@ -76,73 +70,56 @@ private: TAtomicCounter ReindexComplete = 0; - STATEFN(StateFunc); + STRICT_STFUNC(StateFunc, + HFunc(NActors::TEvents::TEvWakeup, HandleWakeup); + HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleQueryResponse); + HFunc(NKqp::TEvKqp::TEvProcessResponse, HandleProcessResponse); + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + IgnoreFunc(NKqp::TEvKqp::TEvCloseSessionResponse); + ) + + void HandleQueryResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx); + void HandleProcessResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx); + void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx); + void HandleFailure(const TActorContext& ctx); + void StartQueuesListing(const TActorContext& ctx); - void OnQueuesListSessionReady(const TActorContext& ctx); - void OnQueuesListPrepared(const TActorContext& ctx); - void RunQueuesListQuery(const TActorContext& ctx, bool initial = false); - void OnQueuesListQueryComplete(const TActorContext& ctx); + void RunQueuesListQuery(const TActorContext& ctx); + void OnQueuesListQueryComplete(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx); void RunEventsListing(const TActorContext& ctx); - void OnEventsListingDone(const TActorContext& ctx); - void StartCleanupSession(const TActorContext& ctx); - void OnCleanupSessionReady(const TActorContext& ctx); - void OnCleanupTxReady(const TActorContext& ctx); - void OnCleanupPrepared(const TActorContext& ctx); + void OnEventsListingDone(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx); void RunEventsCleanup(const TActorContext& ctx); void OnCleanupQueryComplete(const TActorContext&ctx); - void OnCleanupTxCommitted(const TActorContext&ctx); + + void RunQuery(const TString& query, NKikimr::NClient::TParameters* params, bool readonly, + const TActorContext& ctx); void ProcessEventsQueue(const TActorContext& ctx); void ProcessReindexIfRequired(const TActorContext& ctx); void ProcessReindexResult(const TActorContext& ctx); void WaitNextCycle(const TActorContext& ctx); + void HandlePoisonPill(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx); + + void StopSession(const TActorContext& ctx); - void StartSession(ui32 evTag, const TActorContext& ctx); - void PrepareDataQuery(const TString& query, ui32 evTag, const TActorContext& ctx); - void StartTx(ui32 evTag, const TActorContext& ctx); - void CommitTx(ui32 evTag, const TActorContext& ctx); - void RunPrepared(NYdb::TParams&& params, ui32 evTag, const TActorContext& ctx); - bool SessionStarted(const TActorContext& ctx); - bool QueryPrepared(const TActorContext& ctx); - bool QueryComplete(const TActorContext& ctx); - bool TxStarted(const TActorContext& ctx); - bool TxCommitted(const TActorContext& ctx); - - - template<class TFutureType> - void Apply(TFutureType* future, ui32 evTag, const TActorContext& ctx) { - future->Subscribe( - [evTag, as = GetActorSystem(), selfId = ctx.SelfID](const auto&) - {as->Send(selfId, new TEvWakeup(evTag));} - ); - } void InitQueries(const TString& root); void SaveQueueEvent(const TString& queueName, const TQueueEvent& event, const TActorContext& ctx); void SendJson(const TString& json, const TActorContext &ctx); static NActors::TActorSystem* GetActorSystem(); - constexpr static ui32 StartQueuesListingTag = 1; - constexpr static ui32 QueuesListSessionStartedTag = 2; - constexpr static ui32 QueuesListPreparedTag = 3; - constexpr static ui32 QueuesListQueryCompleteTag = 4; - - constexpr static ui32 RunEventsListingTag = 10; - constexpr static ui32 EventsListingDoneTag = 11; - - constexpr static ui32 StartCleanupTag = 20; - constexpr static ui32 CleanupSessionReadyTag = 21; - constexpr static ui32 CleanupTxReadyTag = 22; - constexpr static ui32 CleanupPreparedTag = 23; - constexpr static ui32 CleanupQueryCompleteTag = 24; - constexpr static ui32 CleanupTxCommittedTag = 25; - - constexpr static ui32 StopAllTag = 99; - + enum class EState { + Initial, + QueuesListingExecute, + EventsListingExecute, + CleanupExecute, + Stopping + }; + EState State = EState::Initial; }; } // namespace NKikimr::NSQS diff --git a/ydb/core/ymq/actor/service.cpp b/ydb/core/ymq/actor/service.cpp index 2757d23a38..ceb39c91a0 100644 --- a/ydb/core/ymq/actor/service.cpp +++ b/ydb/core/ymq/actor/service.cpp @@ -269,12 +269,10 @@ static TString GetEndpoint(const NKikimrConfig::TSqsConfig& config) { } } -TSqsService::TSqsService(const TMaybe<ui32>& ydbPort) { - if (ydbPort.Defined()) { - YcSearchEventsConfig.GrpcPort = *ydbPort; - } +TSqsService::TSqsService() { DebugInfo->SqsServiceActorPtr = this; } + TSqsService::~TSqsService() { DebugInfo->SqsServiceActorPtr = nullptr; } @@ -309,26 +307,21 @@ void TSqsService::Bootstrap() { RequestSqsUsersList(); RequestSqsQueuesList(); - if (Cfg().HasYcSearchEventsConfig() && YcSearchEventsConfig.GrpcPort) { + if (Cfg().HasYcSearchEventsConfig()) { auto& ycSearchCfg = Cfg().GetYcSearchEventsConfig(); YcSearchEventsConfig.Enabled = ycSearchCfg.GetEnableYcSearch(); YcSearchEventsConfig.ReindexInterval = TDuration::Seconds(ycSearchCfg.GetReindexIntervalSeconds()); YcSearchEventsConfig.RescanInterval = TDuration::Seconds(ycSearchCfg.GetRescanIntervalSeconds()); - auto driverConfig = NYdb::TDriverConfig().SetEndpoint( - TStringBuilder() << "localhost:" << YcSearchEventsConfig.GrpcPort); if (ycSearchCfg.HasTenantMode() && ycSearchCfg.GetTenantMode()) { - driverConfig.SetDatabase(Cfg().GetRoot()); + YcSearchEventsConfig.Database = Cfg().GetRoot(); YcSearchEventsConfig.TenantMode = true; } auto factory = AppData()->SqsAuthFactory; Y_VERIFY(factory); - driverConfig.SetCredentialsProviderFactory(factory->CreateCredentialsProviderFactory(Cfg())); - - YcSearchEventsConfig.Driver = MakeHolder<NYdb::TDriver>(driverConfig); MakeAndRegisterYcEventsProcessor(); } } @@ -1307,17 +1300,13 @@ void TSqsService::MakeAndRegisterYcEventsProcessor() { Y_VERIFY(factory); Register(new TSearchEventsProcessor( root, YcSearchEventsConfig.ReindexInterval, YcSearchEventsConfig.RescanInterval, - MakeSimpleShared<NYdb::NTable::TTableClient>(*YcSearchEventsConfig.Driver), + YcSearchEventsConfig.Database, factory->CreateEventsWriter(Cfg(), GetSqsServiceCounters(AppData()->Counters, "yc_unified_agent")) )); } -// -//IActor* CreateSqsService(const TYcSearchEventsConfig& ycSearchEventsConfig) { -// return new TSqsService(ycSearchEventsConfig); -//} -IActor* CreateSqsService(TMaybe<ui32> ydbPort) { - return new TSqsService(ydbPort); +IActor* CreateSqsService() { + return new TSqsService(); } } // namespace NKikimr::NSQS diff --git a/ydb/core/ymq/actor/service.h b/ydb/core/ymq/actor/service.h index caf091c24b..b351a9eb36 100644 --- a/ydb/core/ymq/actor/service.h +++ b/ydb/core/ymq/actor/service.h @@ -22,7 +22,7 @@ class TSqsService : public TActorBootstrapped<TSqsService> { public: - TSqsService(const TMaybe<ui32>& ydbPort); + TSqsService(); ~TSqsService(); void Bootstrap(); @@ -161,8 +161,7 @@ private: struct TYcSearchEventsConfig { - THolder<NYdb::TDriver> Driver; - ui32 GrpcPort = 0; + TString Database; bool Enabled = false; bool TenantMode = false; TDuration ReindexInterval = TDuration::Hours(4); diff --git a/ydb/core/ymq/actor/serviceid.h b/ydb/core/ymq/actor/serviceid.h index 7219d918ba..0df86cdd56 100644 --- a/ydb/core/ymq/actor/serviceid.h +++ b/ydb/core/ymq/actor/serviceid.h @@ -31,7 +31,7 @@ inline TActorId MakeSqsMeteringServiceID() { return TActorId(0, TStringBuf("SQS_METER")); } -IActor* CreateSqsService(TMaybe<ui32> ydbPort = Nothing()); +IActor* CreateSqsService(); IActor* CreateSqsProxyService(); IActor* CreateSqsAccessService(const TString& address, const TString& pathToRootCA); IActor* CreateSqsFolderService(const TString& address, const TString& pathToRootCA); diff --git a/ydb/core/ymq/actor/ya.make b/ydb/core/ymq/actor/ya.make index 070653bb57..dae80ad61d 100644 --- a/ydb/core/ymq/actor/ya.make +++ b/ydb/core/ymq/actor/ya.make @@ -90,6 +90,7 @@ PEERDIR( ydb/public/lib/value ydb/public/sdk/cpp/client/ydb_types/credentials ydb/library/yql/minikql + ydb/public/lib/deprecated/client ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp b/ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp index e484f117be..55058e83d4 100644 --- a/ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp +++ b/ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp @@ -54,7 +54,7 @@ private: Processor = new TSearchEventsProcessor( SchemePath, TDuration::Minutes(10), TDuration::MilliSeconds(100), - parent->TableClient, + TString(), EventsWriter, true ); @@ -71,7 +71,7 @@ private: } ~TTestRunner() { if (Processor != nullptr) { - auto handle = new IEventHandle(ProcessorId, TActorId(), new TEvWakeup(TSearchEventsProcessor::StopAllTag)); + auto handle = new IEventHandle(ProcessorId, TActorId(), new TEvents::TEvPoisonPill()); Parent->Server->GetRuntime()->Send(handle); } } @@ -228,7 +228,7 @@ private: auto initialCount = Processor->GetReindexCount(); DispatchOpts.CustomFinalCondition = [initialCount, processor = Processor]() { return processor->GetReindexCount() > initialCount; }; if (!initialCount) { - auto handle = new IEventHandle(ProcessorId, TActorId(), new TEvWakeup(TSearchEventsProcessor::StartQueuesListingTag)); + auto handle = new IEventHandle(ProcessorId, TActorId(), new TEvWakeup()); Parent->Server->GetRuntime()->Send(handle); } Parent->Server->GetRuntime()->DispatchEvents(DispatchOpts, TDuration::Minutes(1)); |