diff options
author | makostrov <makostrov@yandex-team.com> | 2023-08-11 13:22:50 +0300 |
---|---|---|
committer | makostrov <makostrov@yandex-team.com> | 2023-08-11 14:36:57 +0300 |
commit | 9e6050fec916ace48ca19e7025f3275eb1f10d60 (patch) | |
tree | 401180ae2f0fc4e075dec07a0018194f986deed1 | |
parent | c481f0ed6f2e82833bc862f6993a3aae37595ed5 (diff) | |
download | ydb-9e6050fec916ace48ca19e7025f3275eb1f10d60.tar.gz |
Change Result -> YdbResult in YMQ
KIKIMR-17490
-rw-r--r-- | ydb/core/ymq/actor/cleanup_queue_data.cpp | 41 | ||||
-rw-r--r-- | ydb/core/ymq/actor/cleanup_queue_data.h | 4 | ||||
-rw-r--r-- | ydb/core/ymq/actor/monitoring.cpp | 13 | ||||
-rw-r--r-- | ydb/core/ymq/base/run_query.cpp | 1 |
4 files changed, 32 insertions, 27 deletions
diff --git a/ydb/core/ymq/actor/cleanup_queue_data.cpp b/ydb/core/ymq/actor/cleanup_queue_data.cpp index 2ca6558479c..41fb4d37275 100644 --- a/ydb/core/ymq/actor/cleanup_queue_data.cpp +++ b/ydb/core/ymq/actor/cleanup_queue_data.cpp @@ -132,26 +132,29 @@ namespace NKikimr::NSQS { } case EState::GetQueueAfterLockUpdate: case EState::GetQueue: { - Y_VERIFY(response.GetResults().size() == 1); - const auto& rr = response.GetResults(0).GetValue().GetStruct(0); - if (rr.GetList().empty()) { + Y_VERIFY(response.YdbResultsSize() == 1); + NYdb::TResultSetParser parser(response.GetYdbResults(0)); + if (parser.RowsCount() == 0) { LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] there are no queues to delete"); LockQueueToRemove(IDLE_TIMEOUT, ctx); return; } - Y_VERIFY(rr.GetList().size() == 1); - const auto& row = rr.GetList()[0]; + Y_VERIFY(parser.RowsCount() == 1); + parser.TryNextRow(); if (State == EState::GetQueueAfterLockUpdate) { - ContinueRemoveData(row, ctx); + ContinueRemoveData(parser, ctx); } else { - StartRemoveData(row, ctx); + StartRemoveData(parser, ctx); } return; } case EState::RemoveData: { - Y_VERIFY(response.GetResults().size() == 1); - const auto& rr = response.GetResults(0).GetValue().GetStruct(0); - ui64 removedRows = rr.GetList()[0].GetStruct(0).GetUint64(); + Y_VERIFY(response.YdbResultsSize() == 1); + NYdb::TResultSetParser parser(response.GetYdbResults(0)); + ui64 removedRows = 0; + if (parser.TryNextRow()) { + removedRows = parser.ColumnParser(0).GetUint64(); + } OnRemovedData(removedRows, ctx); break; } @@ -226,13 +229,13 @@ namespace NKikimr::NSQS { RunYqlQuery(UpdateLockQueueQuery, std::move(params), false, TDuration::Zero(), Cfg().GetRoot(), ctx); } - void TCleanupQueueDataActor::ContinueRemoveData(const NKikimrMiniKQL::TValue& queueRow, const TActorContext& ctx) { + void TCleanupQueueDataActor::ContinueRemoveData(NYdb::TResultSetParser& parser, const TActorContext& ctx) { // Select RemoveTimestamp, QueueIdNumber, FifoQueue, Shards, TablesFormat - ui64 queueIdNumber = queueRow.GetStruct(1).GetOptional().GetUint64(); + ui64 queueIdNumber = *parser.ColumnParser(1).GetOptionalUint64(); if (queueIdNumber != QueueIdNumber) { LOG_WARN_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] got queue to continue remove data queue_id_number=" << queueIdNumber << ", but was locked queue_id_number=" << QueueIdNumber); - StartRemoveData(queueRow, ctx); + StartRemoveData(parser, ctx); return; } @@ -240,16 +243,16 @@ namespace NKikimr::NSQS { RunRemoveData(ctx); } - void TCleanupQueueDataActor::StartRemoveData(const NKikimrMiniKQL::TValue& queueRow, const TActorContext& ctx) { + void TCleanupQueueDataActor::StartRemoveData(NYdb::TResultSetParser& parser, const TActorContext& ctx) { State = EState::RemoveData; ClearedTablesCount = 0; // Select RemoveTimestamp, QueueIdNumber, FifoQueue, Shards, TablesFormat - RemoveQueueTimetsamp = queueRow.GetStruct(0).GetOptional().GetUint64(); - QueueIdNumber = queueRow.GetStruct(1).GetOptional().GetUint64(); - IsFifoQueue = queueRow.GetStruct(2).GetOptional().GetBool(); - Shards = queueRow.GetStruct(3).GetOptional().GetUint32(); - TablesFormat = queueRow.GetStruct(4).GetOptional().GetUint32(); + RemoveQueueTimetsamp = *parser.ColumnParser(0).GetOptionalUint64(); + QueueIdNumber = *parser.ColumnParser(1).GetOptionalUint64(); + IsFifoQueue = *parser.ColumnParser(2).GetOptionalBool(); + Shards = *parser.ColumnParser(3).GetOptionalUint32(); + TablesFormat = *parser.ColumnParser(4).GetOptionalUint32(); LOG_INFO_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] got queue to remove data: removed at " << RemoveQueueTimetsamp << " queue_id_number=" << QueueIdNumber << " tables_format=" << TablesFormat); diff --git a/ydb/core/ymq/actor/cleanup_queue_data.h b/ydb/core/ymq/actor/cleanup_queue_data.h index 0104276f03b..b6168d6f022 100644 --- a/ydb/core/ymq/actor/cleanup_queue_data.h +++ b/ydb/core/ymq/actor/cleanup_queue_data.h @@ -42,8 +42,8 @@ public: void HandleError(const TString& error, const TActorContext& ctx);
void LockQueueToRemove(TDuration runAfter, const TActorContext& ctx);
void UpdateLock(const TActorContext& ctx);
- void ContinueRemoveData(const NKikimrMiniKQL::TValue& queueRow, const TActorContext& ctx);
- void StartRemoveData(const NKikimrMiniKQL::TValue& queueRow, const TActorContext& ctx);
+ void ContinueRemoveData(NYdb::TResultSetParser& queueRow, const TActorContext& ctx);
+ void StartRemoveData(NYdb::TResultSetParser& queueRow, const TActorContext& ctx);
std::optional<std::pair<TString, bool>> GetNextTable() const;
void ClearNextTable(const TActorContext& ctx);
diff --git a/ydb/core/ymq/actor/monitoring.cpp b/ydb/core/ymq/actor/monitoring.cpp index 3f0d5698257..3df11cdb612 100644 --- a/ydb/core/ymq/actor/monitoring.cpp +++ b/ydb/core/ymq/actor/monitoring.cpp @@ -47,18 +47,19 @@ namespace NKikimr::NSQS { RetryPeriod = RETRY_PERIOD_MIN;
auto& response = record.GetResponse();
- Y_VERIFY(response.GetResults().size() == 1);
- const auto& rr = response.GetResults(0).GetValue().GetStruct(0);
+ Y_VERIFY(response.YdbResultsSize() == 1);
+ NYdb::TResultSetParser parser(response.GetYdbResults(0));
TDuration removeQueuesDataLag;
- if (!rr.GetList().empty()) {
- TInstant minRemoveQueueTimestamp = TInstant::MilliSeconds(rr.GetList()[0].GetStruct(0).GetOptional().GetUint64());
+ if (parser.RowsCount()) {
+ parser.TryNextRow();
+ TInstant minRemoveQueueTimestamp = TInstant::MilliSeconds(*parser.ColumnParser(0).GetOptionalUint64());
removeQueuesDataLag = ctx.Now() - minRemoveQueueTimestamp;
}
- LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[monitoring] Report deletion queue data lag: " << removeQueuesDataLag << ", count: " << rr.GetList().size());
+ LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[monitoring] Report deletion queue data lag: " << removeQueuesDataLag << ", count: " << parser.RowsCount());
*Counters->CleanupRemovedQueuesLagSec = removeQueuesDataLag.Seconds();
- *Counters->CleanupRemovedQueuesLagCount = rr.GetList().size();
+ *Counters->CleanupRemovedQueuesLagCount = parser.RowsCount();
RequestMetrics(RetryPeriod, ctx);
}
diff --git a/ydb/core/ymq/base/run_query.cpp b/ydb/core/ymq/base/run_query.cpp index fdc2fff0983..66c037b1d3a 100644 --- a/ydb/core/ymq/base/run_query.cpp +++ b/ydb/core/ymq/base/run_query.cpp @@ -19,6 +19,7 @@ namespace NKikimr::NSQS { request->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
request->SetKeepSession(false);
request->SetQuery(query);
+ request->SetUsePublicResponseDataFormat(true);
if (database) {
request->SetDatabase(database);
|