aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@yandex-team.ru>2022-06-06 16:47:12 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-06-06 16:47:12 +0300
commit00ae6ca0fc95a0f7710c47631e0cbe6f788589ac (patch)
tree568d7edee31c68e2c56b37083414dd4abd7fb9e8
parent4b2bcca39c8932b48853d9107f7909516db2564a (diff)
downloadydb-00ae6ca0fc95a0f7710c47631e0cbe6f788589ac.tar.gz
SQS-689
YC Search events from YMQ are now processed via KQP proxy, not external driver. Enables switching to SSL-port in YDB configs for YMQ (currently we use Grpc-port) REVIEW: 2559837 REVIEW: 2609000 x-ydb-stable-ref: d3c9084f6b0c7a7cde35297b2380177541ecc92e
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp5
-rw-r--r--ydb/core/ymq/actor/index_events_processor.cpp415
-rw-r--r--ydb/core/ymq/actor/index_events_processor.h97
-rw-r--r--ydb/core/ymq/actor/service.cpp25
-rw-r--r--ydb/core/ymq/actor/service.h5
-rw-r--r--ydb/core/ymq/actor/serviceid.h2
-rw-r--r--ydb/core/ymq/actor/ya.make1
-rw-r--r--ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp6
8 files changed, 203 insertions, 353 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index eac22262db..6d1d24f1d9 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -2094,11 +2094,8 @@ TSqsServiceInitializer::TSqsServiceInitializer(const TKikimrRunConfig& runConfig
void TSqsServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
if (Config.GetSqsConfig().GetEnableSqs()) {
- ui32 grpcPort = 0;
- if (Config.HasGRpcConfig())
- grpcPort = Config.GetGRpcConfig().GetPort();
{
- IActor* actor = NSQS::CreateSqsService(grpcPort);
+ IActor* actor = NSQS::CreateSqsService();
setup->LocalServices.emplace_back(
NSQS::MakeSqsServiceID(NodeId),
TActorSetupCmd(actor, TMailboxType::HTSwap, appData->UserPoolId));
diff --git a/ydb/core/ymq/actor/index_events_processor.cpp b/ydb/core/ymq/actor/index_events_processor.cpp
index 646e06420e..e1fd24587e 100644
--- a/ydb/core/ymq/actor/index_events_processor.cpp
+++ b/ydb/core/ymq/actor/index_events_processor.cpp
@@ -4,22 +4,18 @@
namespace NKikimr::NSQS {
using namespace NActors;
using namespace NJson;
-using namespace NYdb;
+using namespace NKikimr;
constexpr TDuration DEFAULT_RETRY_TIMEOUT = TDuration::Seconds(10);
-constexpr TDuration SHORT_RETRY_TIMEOUT = TDuration::Seconds(2);
-constexpr TDuration DEFAULT_QUERY_TIMEOUT = TDuration::Seconds(30);
TSearchEventsProcessor::TSearchEventsProcessor(
const TString& root, const TDuration& reindexInterval, const TDuration& rescanInterval,
- const TSimpleSharedPtr<NTable::TTableClient>& tableClient,
- IEventsWriterWrapper::TPtr eventsWriter,
- bool waitForWake
+ const TString& database, IEventsWriterWrapper::TPtr eventsWriter, bool waitForWake
)
: RescanInterval(rescanInterval)
, ReindexInterval(reindexInterval)
- , TableClient(tableClient)
, EventsWriter(eventsWriter)
+ , Database(database)
, WaitForWake(waitForWake)
{
InitQueries(root);
@@ -61,169 +57,146 @@ TSearchEventsProcessor::~TSearchEventsProcessor() {
void TSearchEventsProcessor::Bootstrap(const TActorContext& ctx) {
Become(&TSearchEventsProcessor::StateFunc);
+ State = EState::QueuesListingExecute;
if (!WaitForWake)
- ctx.Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup(StartQueuesListingTag));
+ ctx.Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup());
}
-STFUNC(TSearchEventsProcessor::StateFunc) {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvWakeup, HandleWakeup);
+void TSearchEventsProcessor::HandleWakeup(TEvWakeup::TPtr&, const TActorContext& ctx) {
+ switch (State) {
+ case EState::Stopping:
+ return;
+ case EState::QueuesListingExecute:
+ return StartQueuesListing(ctx);
+ case EState::CleanupExecute:
+ return RunEventsCleanup(ctx);
default:
Y_FAIL();
- };
+ }
}
-void TSearchEventsProcessor::HandleWakeup(TEvWakeup::TPtr& ev, const TActorContext& ctx) {
- if (Stopping)
+void TSearchEventsProcessor::HandleQueryResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ const auto& record = ev->Get()->Record.GetRef();
+ if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
+ LOG_ERROR_S(ctx, NKikimrServices::SQS,
+ "YC Search events processor: Got error trying to perform request: " << record);
+ HandleFailure(ctx);
return;
- switch (ev->Get()->Tag) {
- case StartQueuesListingTag:
- StartQueuesListing(ctx);
- break;
- case QueuesListSessionStartedTag:
- OnQueuesListSessionReady(ctx);
- break;
- case QueuesListPreparedTag:
- OnQueuesListPrepared(ctx);
- break;
- case QueuesListQueryCompleteTag:
- OnQueuesListQueryComplete(ctx);
- break;
- case RunEventsListingTag:
- RunEventsListing(ctx);
- break;
- case EventsListingDoneTag:
- OnEventsListingDone(ctx);
- break;
- case StartCleanupTag:
- StartCleanupSession(ctx);
- break;
- case CleanupSessionReadyTag:
- OnCleanupSessionReady(ctx);
- break;
- case CleanupTxReadyTag:
- OnCleanupTxReady(ctx);
- break;
- case CleanupPreparedTag:
- OnCleanupPrepared(ctx);
- break;
- case CleanupQueryCompleteTag:
- OnCleanupQueryComplete(ctx);
- break;
- case CleanupTxCommittedTag:
- OnCleanupTxCommitted(ctx);
- break;
- case StopAllTag:
- Stopping = true;
- EventsWriter->Close();
- break;
+ }
+ switch (State) {
+ case EState::QueuesListingExecute:
+ return OnQueuesListQueryComplete(ev, ctx);
+ case EState::EventsListingExecute:
+ return OnEventsListingDone(ev, ctx);
+ case EState::CleanupExecute:
+ return OnCleanupQueryComplete(ctx);
+ case EState::Stopping:
+ return StopSession(ctx);
default:
Y_FAIL();
}
}
-void TSearchEventsProcessor::StartQueuesListing(const TActorContext& ctx) {
- LastQueuesKey = {};
- StartSession(QueuesListSessionStartedTag, ctx);
+void TSearchEventsProcessor::HandleProcessResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) {
+ const auto& record = ev->Get()->Record;
+ LOG_ERROR_S(ctx, NKikimrServices::SQS, "YC Search events processor: failed to list ymq events: " << record);
+ HandleFailure(ctx);
}
-void TSearchEventsProcessor::OnQueuesListSessionReady(const TActorContext &ctx) {
- if (!SessionStarted(ctx)) {
- return ctx.Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup(StartQueuesListingTag));
+void TSearchEventsProcessor::HandleFailure(const TActorContext& ctx) {
+ StopSession(ctx);
+ switch (State) {
+ case EState::EventsListingExecute:
+ State = EState::QueuesListingExecute;
+ case EState::QueuesListingExecute:
+ case EState::CleanupExecute:
+ return ctx.Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup());
+ case EState::Stopping:
+ return;
+ default:
+ Y_FAIL();
}
- PrepareDataQuery(SelectQueuesQuery, QueuesListPreparedTag, ctx);
}
-void TSearchEventsProcessor::OnQueuesListPrepared(const TActorContext &ctx) {
- if (!QueryPrepared(ctx)) {
- return ctx.Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup(StartQueuesListingTag));
- }
- RunQueuesListQuery(ctx, true);
+void TSearchEventsProcessor::StartQueuesListing(const TActorContext& ctx) {
+ LastQueuesKey = {};
+ StopSession(ctx);
+ ExistingQueues.clear();
+ RunQueuesListQuery(ctx);
}
-void TSearchEventsProcessor::RunQueuesListQuery(const TActorContext& ctx, bool initial) {
- if (initial) {
- ExistingQueues.clear();
- }
- Y_VERIFY(Session.Defined());
- Y_VERIFY(PreparedQuery.Defined());
+void TSearchEventsProcessor::RunQueuesListQuery(const TActorContext& ctx) {
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+ auto* request = ev->Record.MutableRequest();
- auto builder = PreparedQuery.Get()->GetParamsBuilder();
- {
- auto &param = builder.AddParam("$Account");
- param.Utf8(LastQueuesKey.Account);
- param.Build();
- }
- {
- auto &param = builder.AddParam("$QueueName");
- param.Utf8(LastQueuesKey.QueueName);
- param.Build();
- }
- RunPrepared(builder.Build(), QueuesListQueryCompleteTag, ctx);
+ request->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ request->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
+ request->SetKeepSession(false);
+ request->SetPreparedQuery(SelectQueuesQuery);
+
+ NClient::TParameters params;
+ params["$Account"] = LastQueuesKey.Account;
+ params["$QueueName"] = LastQueuesKey.QueueName;
+
+ RunQuery(SelectQueuesQuery, &params, true, ctx);
}
-void TSearchEventsProcessor::OnQueuesListQueryComplete(const TActorContext& ctx) {
- if (!QueryComplete(ctx)) {
- return ctx.Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup(StartQueuesListingTag));
- }
+void TSearchEventsProcessor::OnQueuesListQueryComplete(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev,
+ const TActorContext& ctx) {
- const auto& result = QueryResult.Get()->GetResultSet(0);
+ auto& response = ev->Get()->Record.GetRef().GetResponse();
- TResultSetParser parser(result);
- while (parser.TryNextRow()) {
- TString queueName = *parser.ColumnParser("QueueName").GetOptionalUtf8();
- TString cloudId = *parser.ColumnParser("Account").GetOptionalUtf8();
- TString customName = *parser.ColumnParser("CustomQueueName").GetOptionalUtf8();
- ui64 createTs = *parser.ColumnParser("CreatedTimestamp").GetOptionalUint64();
- TString folderId = *parser.ColumnParser("FolderId").GetOptionalUtf8();
+ Y_VERIFY(response.GetResults().size() == 1);
+ TString queueName, cloudId;
+ const auto& rr = response.GetResults(0).GetValue().GetStruct(0);
+ // "SELECT Account, QueueName, CustomQueueName, CreatedTimestamp, FolderId"
+ for (const auto& row : rr.GetList()) {
+ cloudId = row.GetStruct(0).GetOptional().GetText();
+ queueName = row.GetStruct(1).GetOptional().GetText();
+ auto customName = row.GetStruct(2).GetOptional().GetText();
+ auto createTs = row.GetStruct(3).GetOptional().GetUint64();
+ auto folderId = row.GetStruct(4).GetOptional().GetText();
auto insResult = ExistingQueues.insert(std::make_pair(
queueName, TQueueEvent{EQueueEventType::Existed, createTs, customName, cloudId, folderId}
));
Y_VERIFY(insResult.second);
- LastQueuesKey.QueueName = queueName;
- LastQueuesKey.Account = cloudId;
}
- if (result.Truncated()) {
- RunQueuesListQuery(ctx, false);
+ if (SessionId.empty()) {
+ SessionId = response.GetSessionId();
+ } else {
+ Y_VERIFY(SessionId == response.GetSessionId());
+ }
+
+ LastQueuesKey.QueueName = queueName;
+ LastQueuesKey.Account = cloudId;
+
+ if (rr.ListSize() > 0) {
+ RunQueuesListQuery(ctx);
} else {
- Send(ctx.SelfID, new TEvWakeup(RunEventsListingTag));
+ StopSession(ctx);
+ State = EState::EventsListingExecute;
+ RunEventsListing(ctx);
}
}
void TSearchEventsProcessor::RunEventsListing(const TActorContext& ctx) {
- QueryResult = Nothing();
- Status = TableClient->RetryOperation<NTable::TDataQueryResult>([this, query = SelectEventsQuery](NTable::TSession session) {
- auto tx = NTable::TTxControl::BeginTx().CommitTx();
- return session.ExecuteDataQuery(
- query, tx, NTable::TExecDataQuerySettings().ClientTimeout(DEFAULT_QUERY_TIMEOUT)
- ).Apply([this](const auto& future) mutable {
- QueryResult = future.GetValue();
- return future;
- });
- });
- Apply(&Status, EventsListingDoneTag, ctx);
+ RunQuery(SelectEventsQuery, nullptr, true, ctx);
}
-void TSearchEventsProcessor::OnEventsListingDone(const TActorContext& ctx) {
- auto& status = Status.GetValue();
- if (!status.IsSuccess()) {
- Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup(RunEventsListingTag));
- return;
- }
-
- Y_VERIFY(QueryResult.Defined());
- Y_VERIFY(QueryResult.Get()->GetResultSets().size() == 1);
- const auto& result = QueryResult.Get()->GetResultSet(0);
- TResultSetParser parser(result);
+void TSearchEventsProcessor::OnEventsListingDone(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
QueuesEvents.clear();
-
- while (parser.TryNextRow()) {
- TString cloudId = *parser.ColumnParser("Account").GetOptionalUtf8();
- TString queueName = *parser.ColumnParser("QueueName").GetOptionalUtf8();
- ui64 evType = *parser.ColumnParser("EventType").GetOptionalUint64();
- TString customName = *parser.ColumnParser("CustomQueueName").GetOptionalUtf8();
- TString folderId = *parser.ColumnParser("FolderId").GetOptionalUtf8();
- ui64 timestamp = *parser.ColumnParser("EventTimestamp").GetOptionalUint64();
+ const auto& record = ev->Get()->Record.GetRef();
+ Y_VERIFY(record.GetResponse().GetResults().size() == 1);
+ const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
+
+ for (const auto& row : rr.GetList()) {
+ // "SELECT Account, QueueName, EventType, CustomQueueName, EventTimestamp, FolderId
+ auto cloudId = row.GetStruct(0).GetOptional().GetText();
+ auto queueName = row.GetStruct(1).GetOptional().GetText();
+ auto evType = row.GetStruct(2).GetOptional().GetUint64();
+ auto customName = row.GetStruct(3).GetOptional().GetText();
+ auto timestamp = row.GetStruct(4).GetOptional().GetUint64();
+ auto folderId = row.GetStruct(5).GetOptional().GetText();
auto& qEvents = QueuesEvents[queueName];
auto insResult = qEvents.insert(std::make_pair(
timestamp, TQueueEvent{EQueueEventType(evType), timestamp, customName, cloudId, folderId}
@@ -233,68 +206,28 @@ void TSearchEventsProcessor::OnEventsListingDone(const TActorContext& ctx) {
ProcessEventsQueue(ctx);
}
-void TSearchEventsProcessor::StartCleanupSession(const TActorContext& ctx) {
- StartSession(CleanupSessionReadyTag, ctx);
-}
-
-void TSearchEventsProcessor::OnCleanupSessionReady(const TActorContext &ctx) {
- if (!SessionStarted(ctx)) {
- return ctx.Schedule(SHORT_RETRY_TIMEOUT, new TEvWakeup(StartCleanupTag));
- }
- StartTx(CleanupTxReadyTag, ctx);
-}
-
-void TSearchEventsProcessor::OnCleanupTxReady(const TActorContext &ctx) {
- if (!TxStarted(ctx)) {
- return ctx.Schedule(SHORT_RETRY_TIMEOUT, new TEvWakeup(StartCleanupTag));
- }
- PrepareDataQuery(DeleteEventQuery, CleanupPreparedTag, ctx);
-}
-
-void TSearchEventsProcessor::OnCleanupPrepared(const TActorContext &ctx) {
- if (!QueryPrepared(ctx)) {
- return ctx.Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup(StartCleanupTag));
- }
- RunEventsCleanup(ctx);
-}
-
void TSearchEventsProcessor::RunEventsCleanup(const TActorContext& ctx) {
- Y_VERIFY(PreparedQuery.Defined());
- Y_VERIFY(CurrentTx.Defined());
+ State = EState::CleanupExecute;
- auto builder = PreparedQuery.Get()->GetParamsBuilder();
- auto &param = builder.AddParam("$Events");
- param.BeginList();
+ NClient::TParameters params;
+ auto param = params["$Events"];
for (const auto&[qName, events] : QueuesEvents) {
for (const auto&[_, event]: events) {
- param.AddListItem().BeginStruct().AddMember("Account").Utf8(event.CloudId)
- .AddMember("QueueName").Utf8(qName)
- .AddMember("EventType").Uint64(static_cast<ui64>(event.Type))
- .EndStruct();
-
+ auto item = param.AddListItem();
+ item["Account"] = event.CloudId;
+ item["QueueName"] = qName;
+ item["EventType"] = static_cast<ui64>(event.Type);
}
}
- param.EndList().Build();
- RunPrepared(builder.Build(), CleanupQueryCompleteTag, ctx);
+ RunQuery(DeleteEventQuery, &params, false, ctx);
}
void TSearchEventsProcessor::OnCleanupQueryComplete(const TActorContext& ctx) {
- if (!QueryComplete(ctx)) {
- return ctx.Schedule(SHORT_RETRY_TIMEOUT, new TEvWakeup(StartCleanupTag));
- }
- CommitTx(CleanupTxCommittedTag, ctx);
-}
-
-void TSearchEventsProcessor::OnCleanupTxCommitted(const TActorContext &ctx) {
- if (!TxCommitted(ctx)) {
- return ctx.Schedule(DEFAULT_RETRY_TIMEOUT, new TEvWakeup(StartCleanupTag));
- }
QueuesEvents.clear();
ProcessReindexIfRequired(ctx);
}
void TSearchEventsProcessor::ProcessEventsQueue(const TActorContext& ctx) {
-
for (const auto& [qName, events] : QueuesEvents) {
for (const auto& [ts, event]: events) {
auto existsIter = ExistingQueues.find(qName);
@@ -315,7 +248,8 @@ void TSearchEventsProcessor::ProcessEventsQueue(const TActorContext& ctx) {
}
}
if (!QueuesEvents.empty()) {
- Send(ctx.SelfID, new TEvWakeup(StartCleanupTag));
+ State = EState::CleanupExecute;
+ RunEventsCleanup(ctx);
} else {
ProcessReindexIfRequired(ctx);
}
@@ -346,7 +280,8 @@ void TSearchEventsProcessor::ProcessReindexResult(const TActorContext &ctx) {
}
void TSearchEventsProcessor::WaitNextCycle(const TActorContext &ctx) {
- ctx.Schedule(RescanInterval, new TEvWakeup(StartQueuesListingTag));
+ State = EState::QueuesListingExecute;
+ ctx.Schedule(RescanInterval, new TEvWakeup());
}
ui64 TSearchEventsProcessor::GetReindexCount() const {
@@ -357,99 +292,47 @@ NActors::TActorSystem* TSearchEventsProcessor::GetActorSystem() {
return TActivationContext::ActorSystem();
}
-void TSearchEventsProcessor::StartSession(ui32 evTag, const TActorContext &ctx) {
- Session = Nothing();
- CurrentTx = Nothing();
- PreparedQuery = Nothing();
- QueryResult = Nothing();
- SessionFuture = TableClient->GetSession();
- Apply(&SessionFuture, evTag, ctx);
-}
-
-void TSearchEventsProcessor::PrepareDataQuery(const TString& query, ui32 evTag, const TActorContext& ctx) {
- Y_VERIFY(Session.Defined());
- PreparedQuery = Nothing();
- QueryResult = Nothing();
- PrepQueryFuture = Session.Get()->PrepareDataQuery(query);
- Apply(&PrepQueryFuture, evTag, ctx);
-}
-
-void TSearchEventsProcessor::StartTx(ui32 evTag, const TActorContext &ctx) {
- Y_VERIFY(Session.Defined());
- CurrentTx = Nothing();
- TxFuture = Session.Get()->BeginTransaction();
- Apply(&TxFuture, evTag, ctx);
+void TSearchEventsProcessor::StopSession(const TActorContext& ctx) {
+ if (!SessionId.empty()) {
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
+ ev->Record.MutableRequest()->SetSessionId(SessionId);
+ Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
+ SessionId = TString();
+ }
}
-void TSearchEventsProcessor::CommitTx(ui32 evTag, const TActorContext &ctx) {
- Y_VERIFY(Session.Defined());
- Y_VERIFY(CurrentTx.Defined());
- CommitTxFuture = CurrentTx.Get()->Commit();
- Apply(&CommitTxFuture, evTag, ctx);
-}
+void TSearchEventsProcessor::RunQuery(const TString& query, NKikimr::NClient::TParameters* params, bool readonly,
+ const TActorContext& ctx) {
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+ auto* request = ev->Record.MutableRequest();
-void TSearchEventsProcessor::RunPrepared(NYdb::TParams&& params, ui32 evTag, const TActorContext& ctx) {
- Y_VERIFY(Session.Defined());
- Y_VERIFY(PreparedQuery.Defined());
- QueryResult = Nothing();
- NTable::TTxControl txControl = CurrentTx.Defined() ? NTable::TTxControl::Tx(*CurrentTx)
- : NTable::TTxControl::BeginTx().CommitTx();
- QueryResultFuture = PreparedQuery.Get()->Execute(txControl, std::forward<NYdb::TParams>(params));
- Apply(&QueryResultFuture, evTag, ctx);
-}
+ request->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ request->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
+ request->SetKeepSession(State == EState::QueuesListingExecute);
+ request->SetQuery(query);
-bool TSearchEventsProcessor::SessionStarted(const TActorContext&) {
- Y_VERIFY(SessionFuture.HasValue());
- auto& value = SessionFuture.GetValueSync();
- if (!value.IsSuccess())
- return false;
- Session = value.GetSession();
- return true;
-}
+ if (!SessionId.empty()) {
+ request->SetSessionId(SessionId);
+ }
+ if (!Database.empty())
+ request->SetDatabase(Database);
-bool TSearchEventsProcessor::TxStarted(const TActorContext&) {
- Y_VERIFY(Session.Defined());
- Y_VERIFY(TxFuture.HasValue());
- auto& value = TxFuture.GetValueSync();
- if (!value.IsSuccess())
- return false;
- CurrentTx = value.GetTransaction();
- return true;
-}
-bool TSearchEventsProcessor::TxCommitted(const TActorContext&) {
- Y_VERIFY(Session.Defined());
- Y_VERIFY(CurrentTx.Defined());
- Y_VERIFY(CommitTxFuture.HasValue());
- auto& value = CommitTxFuture.GetValueSync();
- if (!value.IsSuccess())
- return false;
- CurrentTx = Nothing();
- return true;
-}
+ request->MutableQueryCachePolicy()->set_keep_in_cache(true);
-bool TSearchEventsProcessor::QueryPrepared(const TActorContext&) {
- Y_VERIFY(Session.Defined());
- Y_VERIFY(PrepQueryFuture.HasValue());
- auto& value = PrepQueryFuture.GetValueSync();
- if (!value.IsSuccess()) {
- return false;
+ if (readonly) {
+ request->MutableTxControl()->mutable_begin_tx()->mutable_stale_read_only();
+ } else {
+ request->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
}
- PreparedQuery = value.GetQuery();
- return true;
-}
-bool TSearchEventsProcessor::QueryComplete(const TActorContext&) {
- Y_VERIFY(Session.Defined());
- Y_VERIFY(QueryResultFuture.HasValue());
- QueryResult = QueryResultFuture.GetValueSync();
- if (!QueryResult.Get()->IsSuccess()) {
- return false;
+ request->MutableTxControl()->set_commit_tx(true);
+ if (params != nullptr) {
+ request->MutableParameters()->Swap(params);
}
- return true;
-}
+ Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
+}
void TSearchEventsProcessor::SaveQueueEvent(
-
const TString& queueName, const TQueueEvent& event, const TActorContext& ctx
) {
auto tsIsoString = TInstant::MilliSeconds(event.Timestamp).ToIsoStringLocal();
@@ -483,6 +366,10 @@ void TSearchEventsProcessor::SaveQueueEvent(
SendJson(ss.Str(), ctx);
}
+void TSearchEventsProcessor::HandlePoisonPill(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) {
+ Die(ctx);
+}
+
void IEventsWriterWrapper::Close() {
if (!Closed) {
Closed = true;
diff --git a/ydb/core/ymq/actor/index_events_processor.h b/ydb/core/ymq/actor/index_events_processor.h
index a8ae462064..fd75e6dded 100644
--- a/ydb/core/ymq/actor/index_events_processor.h
+++ b/ydb/core/ymq/actor/index_events_processor.h
@@ -2,6 +2,9 @@
#include "events.h"
#include <ydb/core/ymq/base/events_writer_iface.h>
+#include <ydb/core/kqp/kqp.h>
+#include <ydb/public/lib/deprecated/kicli/kicli.h>
+
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
@@ -21,7 +24,7 @@ friend class TIndexProcesorTests;
public:
TSearchEventsProcessor(const TString& root, const TDuration& reindexInterval, const TDuration& rescanInterval,
- const TSimpleSharedPtr<NYdb::NTable::TTableClient>& tableClient,
+ const TString& database,
IEventsWriterWrapper::TPtr eventsWriter,
bool waitForWake = false);
@@ -36,23 +39,14 @@ private:
TDuration RescanInterval;
TDuration ReindexInterval;
TInstant LastReindex = TInstant::Zero();
- TSimpleSharedPtr<NYdb::NTable::TTableClient> TableClient;
IEventsWriterWrapper::TPtr EventsWriter;
- NYdb::TAsyncStatus Status;
- NYdb::NTable::TAsyncCreateSessionResult SessionFuture;
- NYdb::NTable::TAsyncBeginTransactionResult TxFuture;
- NYdb::NTable::TAsyncCommitTransactionResult CommitTxFuture;
- NYdb::NTable::TAsyncPrepareQueryResult PrepQueryFuture;
- NYdb::NTable::TAsyncDataQueryResult QueryResultFuture;
+ TString Database;
+
+ TString SessionId;
- TMaybe<NYdb::NTable::TSession> Session;
- TMaybe<NYdb::NTable::TDataQuery> PreparedQuery;
- TMaybe<NYdb::NTable::TTransaction> CurrentTx;
- TMaybe<NYdb::NTable::TDataQueryResult> QueryResult;
bool WaitForWake = false;
- bool Stopping = false;
enum class EQueueEventType {
Deleted = 0,
Created = 1,
@@ -76,73 +70,56 @@ private:
TAtomicCounter ReindexComplete = 0;
- STATEFN(StateFunc);
+ STRICT_STFUNC(StateFunc,
+ HFunc(NActors::TEvents::TEvWakeup, HandleWakeup);
+ HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleQueryResponse);
+ HFunc(NKqp::TEvKqp::TEvProcessResponse, HandleProcessResponse);
+ HFunc(TEvents::TEvPoisonPill, HandlePoisonPill);
+ IgnoreFunc(NKqp::TEvKqp::TEvCloseSessionResponse);
+ )
+
+ void HandleQueryResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx);
+ void HandleProcessResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx);
+
void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx);
+ void HandleFailure(const TActorContext& ctx);
+
void StartQueuesListing(const TActorContext& ctx);
- void OnQueuesListSessionReady(const TActorContext& ctx);
- void OnQueuesListPrepared(const TActorContext& ctx);
- void RunQueuesListQuery(const TActorContext& ctx, bool initial = false);
- void OnQueuesListQueryComplete(const TActorContext& ctx);
+ void RunQueuesListQuery(const TActorContext& ctx);
+ void OnQueuesListQueryComplete(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx);
void RunEventsListing(const TActorContext& ctx);
- void OnEventsListingDone(const TActorContext& ctx);
- void StartCleanupSession(const TActorContext& ctx);
- void OnCleanupSessionReady(const TActorContext& ctx);
- void OnCleanupTxReady(const TActorContext& ctx);
- void OnCleanupPrepared(const TActorContext& ctx);
+ void OnEventsListingDone(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx);
void RunEventsCleanup(const TActorContext& ctx);
void OnCleanupQueryComplete(const TActorContext&ctx);
- void OnCleanupTxCommitted(const TActorContext&ctx);
+
+ void RunQuery(const TString& query, NKikimr::NClient::TParameters* params, bool readonly,
+ const TActorContext& ctx);
void ProcessEventsQueue(const TActorContext& ctx);
void ProcessReindexIfRequired(const TActorContext& ctx);
void ProcessReindexResult(const TActorContext& ctx);
void WaitNextCycle(const TActorContext& ctx);
+ void HandlePoisonPill(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx);
+
+ void StopSession(const TActorContext& ctx);
- void StartSession(ui32 evTag, const TActorContext& ctx);
- void PrepareDataQuery(const TString& query, ui32 evTag, const TActorContext& ctx);
- void StartTx(ui32 evTag, const TActorContext& ctx);
- void CommitTx(ui32 evTag, const TActorContext& ctx);
- void RunPrepared(NYdb::TParams&& params, ui32 evTag, const TActorContext& ctx);
- bool SessionStarted(const TActorContext& ctx);
- bool QueryPrepared(const TActorContext& ctx);
- bool QueryComplete(const TActorContext& ctx);
- bool TxStarted(const TActorContext& ctx);
- bool TxCommitted(const TActorContext& ctx);
-
-
- template<class TFutureType>
- void Apply(TFutureType* future, ui32 evTag, const TActorContext& ctx) {
- future->Subscribe(
- [evTag, as = GetActorSystem(), selfId = ctx.SelfID](const auto&)
- {as->Send(selfId, new TEvWakeup(evTag));}
- );
- }
void InitQueries(const TString& root);
void SaveQueueEvent(const TString& queueName, const TQueueEvent& event, const TActorContext& ctx);
void SendJson(const TString& json, const TActorContext &ctx);
static NActors::TActorSystem* GetActorSystem();
- constexpr static ui32 StartQueuesListingTag = 1;
- constexpr static ui32 QueuesListSessionStartedTag = 2;
- constexpr static ui32 QueuesListPreparedTag = 3;
- constexpr static ui32 QueuesListQueryCompleteTag = 4;
-
- constexpr static ui32 RunEventsListingTag = 10;
- constexpr static ui32 EventsListingDoneTag = 11;
-
- constexpr static ui32 StartCleanupTag = 20;
- constexpr static ui32 CleanupSessionReadyTag = 21;
- constexpr static ui32 CleanupTxReadyTag = 22;
- constexpr static ui32 CleanupPreparedTag = 23;
- constexpr static ui32 CleanupQueryCompleteTag = 24;
- constexpr static ui32 CleanupTxCommittedTag = 25;
-
- constexpr static ui32 StopAllTag = 99;
-
+ enum class EState {
+ Initial,
+ QueuesListingExecute,
+ EventsListingExecute,
+ CleanupExecute,
+ Stopping
+ };
+ EState State = EState::Initial;
};
} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/actor/service.cpp b/ydb/core/ymq/actor/service.cpp
index 2757d23a38..ceb39c91a0 100644
--- a/ydb/core/ymq/actor/service.cpp
+++ b/ydb/core/ymq/actor/service.cpp
@@ -269,12 +269,10 @@ static TString GetEndpoint(const NKikimrConfig::TSqsConfig& config) {
}
}
-TSqsService::TSqsService(const TMaybe<ui32>& ydbPort) {
- if (ydbPort.Defined()) {
- YcSearchEventsConfig.GrpcPort = *ydbPort;
- }
+TSqsService::TSqsService() {
DebugInfo->SqsServiceActorPtr = this;
}
+
TSqsService::~TSqsService() {
DebugInfo->SqsServiceActorPtr = nullptr;
}
@@ -309,26 +307,21 @@ void TSqsService::Bootstrap() {
RequestSqsUsersList();
RequestSqsQueuesList();
- if (Cfg().HasYcSearchEventsConfig() && YcSearchEventsConfig.GrpcPort) {
+ if (Cfg().HasYcSearchEventsConfig()) {
auto& ycSearchCfg = Cfg().GetYcSearchEventsConfig();
YcSearchEventsConfig.Enabled = ycSearchCfg.GetEnableYcSearch();
YcSearchEventsConfig.ReindexInterval = TDuration::Seconds(ycSearchCfg.GetReindexIntervalSeconds());
YcSearchEventsConfig.RescanInterval = TDuration::Seconds(ycSearchCfg.GetRescanIntervalSeconds());
- auto driverConfig = NYdb::TDriverConfig().SetEndpoint(
- TStringBuilder() << "localhost:" << YcSearchEventsConfig.GrpcPort);
if (ycSearchCfg.HasTenantMode() && ycSearchCfg.GetTenantMode()) {
- driverConfig.SetDatabase(Cfg().GetRoot());
+ YcSearchEventsConfig.Database = Cfg().GetRoot();
YcSearchEventsConfig.TenantMode = true;
}
auto factory = AppData()->SqsAuthFactory;
Y_VERIFY(factory);
- driverConfig.SetCredentialsProviderFactory(factory->CreateCredentialsProviderFactory(Cfg()));
-
- YcSearchEventsConfig.Driver = MakeHolder<NYdb::TDriver>(driverConfig);
MakeAndRegisterYcEventsProcessor();
}
}
@@ -1307,17 +1300,13 @@ void TSqsService::MakeAndRegisterYcEventsProcessor() {
Y_VERIFY(factory);
Register(new TSearchEventsProcessor(
root, YcSearchEventsConfig.ReindexInterval, YcSearchEventsConfig.RescanInterval,
- MakeSimpleShared<NYdb::NTable::TTableClient>(*YcSearchEventsConfig.Driver),
+ YcSearchEventsConfig.Database,
factory->CreateEventsWriter(Cfg(), GetSqsServiceCounters(AppData()->Counters, "yc_unified_agent"))
));
}
-//
-//IActor* CreateSqsService(const TYcSearchEventsConfig& ycSearchEventsConfig) {
-// return new TSqsService(ycSearchEventsConfig);
-//}
-IActor* CreateSqsService(TMaybe<ui32> ydbPort) {
- return new TSqsService(ydbPort);
+IActor* CreateSqsService() {
+ return new TSqsService();
}
} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/actor/service.h b/ydb/core/ymq/actor/service.h
index caf091c24b..b351a9eb36 100644
--- a/ydb/core/ymq/actor/service.h
+++ b/ydb/core/ymq/actor/service.h
@@ -22,7 +22,7 @@ class TSqsService
: public TActorBootstrapped<TSqsService>
{
public:
- TSqsService(const TMaybe<ui32>& ydbPort);
+ TSqsService();
~TSqsService();
void Bootstrap();
@@ -161,8 +161,7 @@ private:
struct TYcSearchEventsConfig {
- THolder<NYdb::TDriver> Driver;
- ui32 GrpcPort = 0;
+ TString Database;
bool Enabled = false;
bool TenantMode = false;
TDuration ReindexInterval = TDuration::Hours(4);
diff --git a/ydb/core/ymq/actor/serviceid.h b/ydb/core/ymq/actor/serviceid.h
index 7219d918ba..0df86cdd56 100644
--- a/ydb/core/ymq/actor/serviceid.h
+++ b/ydb/core/ymq/actor/serviceid.h
@@ -31,7 +31,7 @@ inline TActorId MakeSqsMeteringServiceID() {
return TActorId(0, TStringBuf("SQS_METER"));
}
-IActor* CreateSqsService(TMaybe<ui32> ydbPort = Nothing());
+IActor* CreateSqsService();
IActor* CreateSqsProxyService();
IActor* CreateSqsAccessService(const TString& address, const TString& pathToRootCA);
IActor* CreateSqsFolderService(const TString& address, const TString& pathToRootCA);
diff --git a/ydb/core/ymq/actor/ya.make b/ydb/core/ymq/actor/ya.make
index 070653bb57..dae80ad61d 100644
--- a/ydb/core/ymq/actor/ya.make
+++ b/ydb/core/ymq/actor/ya.make
@@ -90,6 +90,7 @@ PEERDIR(
ydb/public/lib/value
ydb/public/sdk/cpp/client/ydb_types/credentials
ydb/library/yql/minikql
+ ydb/public/lib/deprecated/client
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp b/ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp
index e484f117be..55058e83d4 100644
--- a/ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp
+++ b/ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp
@@ -54,7 +54,7 @@ private:
Processor = new TSearchEventsProcessor(
SchemePath,
TDuration::Minutes(10), TDuration::MilliSeconds(100),
- parent->TableClient,
+ TString(),
EventsWriter,
true
);
@@ -71,7 +71,7 @@ private:
}
~TTestRunner() {
if (Processor != nullptr) {
- auto handle = new IEventHandle(ProcessorId, TActorId(), new TEvWakeup(TSearchEventsProcessor::StopAllTag));
+ auto handle = new IEventHandle(ProcessorId, TActorId(), new TEvents::TEvPoisonPill());
Parent->Server->GetRuntime()->Send(handle);
}
}
@@ -228,7 +228,7 @@ private:
auto initialCount = Processor->GetReindexCount();
DispatchOpts.CustomFinalCondition = [initialCount, processor = Processor]() { return processor->GetReindexCount() > initialCount; };
if (!initialCount) {
- auto handle = new IEventHandle(ProcessorId, TActorId(), new TEvWakeup(TSearchEventsProcessor::StartQueuesListingTag));
+ auto handle = new IEventHandle(ProcessorId, TActorId(), new TEvWakeup());
Parent->Server->GetRuntime()->Send(handle);
}
Parent->Server->GetRuntime()->DispatchEvents(DispatchOpts, TDuration::Minutes(1));