aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormakostrov <makostrov@yandex-team.com>2023-08-11 13:22:50 +0300
committermakostrov <makostrov@yandex-team.com>2023-08-11 14:36:57 +0300
commit9e6050fec916ace48ca19e7025f3275eb1f10d60 (patch)
tree401180ae2f0fc4e075dec07a0018194f986deed1
parentc481f0ed6f2e82833bc862f6993a3aae37595ed5 (diff)
downloadydb-9e6050fec916ace48ca19e7025f3275eb1f10d60.tar.gz
Change Result -> YdbResult in YMQ
KIKIMR-17490
-rw-r--r--ydb/core/ymq/actor/cleanup_queue_data.cpp41
-rw-r--r--ydb/core/ymq/actor/cleanup_queue_data.h4
-rw-r--r--ydb/core/ymq/actor/monitoring.cpp13
-rw-r--r--ydb/core/ymq/base/run_query.cpp1
4 files changed, 32 insertions, 27 deletions
diff --git a/ydb/core/ymq/actor/cleanup_queue_data.cpp b/ydb/core/ymq/actor/cleanup_queue_data.cpp
index 2ca6558479c..41fb4d37275 100644
--- a/ydb/core/ymq/actor/cleanup_queue_data.cpp
+++ b/ydb/core/ymq/actor/cleanup_queue_data.cpp
@@ -132,26 +132,29 @@ namespace NKikimr::NSQS {
}
case EState::GetQueueAfterLockUpdate:
case EState::GetQueue: {
- Y_VERIFY(response.GetResults().size() == 1);
- const auto& rr = response.GetResults(0).GetValue().GetStruct(0);
- if (rr.GetList().empty()) {
+ Y_VERIFY(response.YdbResultsSize() == 1);
+ NYdb::TResultSetParser parser(response.GetYdbResults(0));
+ if (parser.RowsCount() == 0) {
LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] there are no queues to delete");
LockQueueToRemove(IDLE_TIMEOUT, ctx);
return;
}
- Y_VERIFY(rr.GetList().size() == 1);
- const auto& row = rr.GetList()[0];
+ Y_VERIFY(parser.RowsCount() == 1);
+ parser.TryNextRow();
if (State == EState::GetQueueAfterLockUpdate) {
- ContinueRemoveData(row, ctx);
+ ContinueRemoveData(parser, ctx);
} else {
- StartRemoveData(row, ctx);
+ StartRemoveData(parser, ctx);
}
return;
}
case EState::RemoveData: {
- Y_VERIFY(response.GetResults().size() == 1);
- const auto& rr = response.GetResults(0).GetValue().GetStruct(0);
- ui64 removedRows = rr.GetList()[0].GetStruct(0).GetUint64();
+ Y_VERIFY(response.YdbResultsSize() == 1);
+ NYdb::TResultSetParser parser(response.GetYdbResults(0));
+ ui64 removedRows = 0;
+ if (parser.TryNextRow()) {
+ removedRows = parser.ColumnParser(0).GetUint64();
+ }
OnRemovedData(removedRows, ctx);
break;
}
@@ -226,13 +229,13 @@ namespace NKikimr::NSQS {
RunYqlQuery(UpdateLockQueueQuery, std::move(params), false, TDuration::Zero(), Cfg().GetRoot(), ctx);
}
- void TCleanupQueueDataActor::ContinueRemoveData(const NKikimrMiniKQL::TValue& queueRow, const TActorContext& ctx) {
+ void TCleanupQueueDataActor::ContinueRemoveData(NYdb::TResultSetParser& parser, const TActorContext& ctx) {
// Select RemoveTimestamp, QueueIdNumber, FifoQueue, Shards, TablesFormat
- ui64 queueIdNumber = queueRow.GetStruct(1).GetOptional().GetUint64();
+ ui64 queueIdNumber = *parser.ColumnParser(1).GetOptionalUint64();
if (queueIdNumber != QueueIdNumber) {
LOG_WARN_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] got queue to continue remove data queue_id_number=" << queueIdNumber
<< ", but was locked queue_id_number=" << QueueIdNumber);
- StartRemoveData(queueRow, ctx);
+ StartRemoveData(parser, ctx);
return;
}
@@ -240,16 +243,16 @@ namespace NKikimr::NSQS {
RunRemoveData(ctx);
}
- void TCleanupQueueDataActor::StartRemoveData(const NKikimrMiniKQL::TValue& queueRow, const TActorContext& ctx) {
+ void TCleanupQueueDataActor::StartRemoveData(NYdb::TResultSetParser& parser, const TActorContext& ctx) {
State = EState::RemoveData;
ClearedTablesCount = 0;
// Select RemoveTimestamp, QueueIdNumber, FifoQueue, Shards, TablesFormat
- RemoveQueueTimetsamp = queueRow.GetStruct(0).GetOptional().GetUint64();
- QueueIdNumber = queueRow.GetStruct(1).GetOptional().GetUint64();
- IsFifoQueue = queueRow.GetStruct(2).GetOptional().GetBool();
- Shards = queueRow.GetStruct(3).GetOptional().GetUint32();
- TablesFormat = queueRow.GetStruct(4).GetOptional().GetUint32();
+ RemoveQueueTimetsamp = *parser.ColumnParser(0).GetOptionalUint64();
+ QueueIdNumber = *parser.ColumnParser(1).GetOptionalUint64();
+ IsFifoQueue = *parser.ColumnParser(2).GetOptionalBool();
+ Shards = *parser.ColumnParser(3).GetOptionalUint32();
+ TablesFormat = *parser.ColumnParser(4).GetOptionalUint32();
LOG_INFO_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] got queue to remove data: removed at " << RemoveQueueTimetsamp
<< " queue_id_number=" << QueueIdNumber << " tables_format=" << TablesFormat);
diff --git a/ydb/core/ymq/actor/cleanup_queue_data.h b/ydb/core/ymq/actor/cleanup_queue_data.h
index 0104276f03b..b6168d6f022 100644
--- a/ydb/core/ymq/actor/cleanup_queue_data.h
+++ b/ydb/core/ymq/actor/cleanup_queue_data.h
@@ -42,8 +42,8 @@ public:
void HandleError(const TString& error, const TActorContext& ctx);
void LockQueueToRemove(TDuration runAfter, const TActorContext& ctx);
void UpdateLock(const TActorContext& ctx);
- void ContinueRemoveData(const NKikimrMiniKQL::TValue& queueRow, const TActorContext& ctx);
- void StartRemoveData(const NKikimrMiniKQL::TValue& queueRow, const TActorContext& ctx);
+ void ContinueRemoveData(NYdb::TResultSetParser& queueRow, const TActorContext& ctx);
+ void StartRemoveData(NYdb::TResultSetParser& queueRow, const TActorContext& ctx);
std::optional<std::pair<TString, bool>> GetNextTable() const;
void ClearNextTable(const TActorContext& ctx);
diff --git a/ydb/core/ymq/actor/monitoring.cpp b/ydb/core/ymq/actor/monitoring.cpp
index 3f0d5698257..3df11cdb612 100644
--- a/ydb/core/ymq/actor/monitoring.cpp
+++ b/ydb/core/ymq/actor/monitoring.cpp
@@ -47,18 +47,19 @@ namespace NKikimr::NSQS {
RetryPeriod = RETRY_PERIOD_MIN;
auto& response = record.GetResponse();
- Y_VERIFY(response.GetResults().size() == 1);
- const auto& rr = response.GetResults(0).GetValue().GetStruct(0);
+ Y_VERIFY(response.YdbResultsSize() == 1);
+ NYdb::TResultSetParser parser(response.GetYdbResults(0));
TDuration removeQueuesDataLag;
- if (!rr.GetList().empty()) {
- TInstant minRemoveQueueTimestamp = TInstant::MilliSeconds(rr.GetList()[0].GetStruct(0).GetOptional().GetUint64());
+ if (parser.RowsCount()) {
+ parser.TryNextRow();
+ TInstant minRemoveQueueTimestamp = TInstant::MilliSeconds(*parser.ColumnParser(0).GetOptionalUint64());
removeQueuesDataLag = ctx.Now() - minRemoveQueueTimestamp;
}
- LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[monitoring] Report deletion queue data lag: " << removeQueuesDataLag << ", count: " << rr.GetList().size());
+ LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[monitoring] Report deletion queue data lag: " << removeQueuesDataLag << ", count: " << parser.RowsCount());
*Counters->CleanupRemovedQueuesLagSec = removeQueuesDataLag.Seconds();
- *Counters->CleanupRemovedQueuesLagCount = rr.GetList().size();
+ *Counters->CleanupRemovedQueuesLagCount = parser.RowsCount();
RequestMetrics(RetryPeriod, ctx);
}
diff --git a/ydb/core/ymq/base/run_query.cpp b/ydb/core/ymq/base/run_query.cpp
index fdc2fff0983..66c037b1d3a 100644
--- a/ydb/core/ymq/base/run_query.cpp
+++ b/ydb/core/ymq/base/run_query.cpp
@@ -19,6 +19,7 @@ namespace NKikimr::NSQS {
request->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
request->SetKeepSession(false);
request->SetQuery(query);
+ request->SetUsePublicResponseDataFormat(true);
if (database) {
request->SetDatabase(database);