diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 09:58:56 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 10:20:20 +0300 |
commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Storages/System/StorageSystemKafkaConsumers.cpp | |
parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
download | ydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Storages/System/StorageSystemKafkaConsumers.cpp')
-rw-r--r-- | contrib/clickhouse/src/Storages/System/StorageSystemKafkaConsumers.cpp | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Storages/System/StorageSystemKafkaConsumers.cpp b/contrib/clickhouse/src/Storages/System/StorageSystemKafkaConsumers.cpp new file mode 100644 index 0000000000..50f0e5ff88 --- /dev/null +++ b/contrib/clickhouse/src/Storages/System/StorageSystemKafkaConsumers.cpp @@ -0,0 +1,175 @@ +#include "clickhouse_config.h" + +#if USE_RDKAFKA + +#include <DataTypes/DataTypeArray.h> +#include <DataTypes/DataTypeString.h> +#include <DataTypes/DataTypesNumber.h> +#include <DataTypes/DataTypeDateTime.h> +#include <DataTypes/DataTypeDateTime64.h> +#include <DataTypes/DataTypeUUID.h> +#include <Columns/ColumnString.h> +#include <Columns/ColumnsNumber.h> +#include <Columns/ColumnsDateTime.h> +#include <Access/ContextAccess.h> +#include <Storages/System/StorageSystemKafkaConsumers.h> +#error #include <Storages/Kafka/StorageKafka.h> + +#include <Interpreters/Context.h> +#include <Interpreters/DatabaseCatalog.h> +#include "base/types.h" + +namespace DB +{ + +NamesAndTypesList StorageSystemKafkaConsumers::getNamesAndTypes() +{ + NamesAndTypesList names_and_types{ + {"database", std::make_shared<DataTypeString>()}, + {"table", std::make_shared<DataTypeString>()}, + {"consumer_id", std::make_shared<DataTypeString>()}, //(number? or string? - single clickhouse table can have many consumers) + {"assignments.topic", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, + {"assignments.partition_id", std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt32>())}, + {"assignments.current_offset", std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt64>())}, + {"exceptions.time", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())}, + {"exceptions.text", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, + {"last_poll_time", std::make_shared<DataTypeDateTime>()}, + {"num_messages_read", std::make_shared<DataTypeUInt64>()}, + {"last_commit_time", std::make_shared<DataTypeDateTime>()}, + {"num_commits", std::make_shared<DataTypeUInt64>()}, + {"last_rebalance_time", std::make_shared<DataTypeDateTime>()}, + {"num_rebalance_revocations", std::make_shared<DataTypeUInt64>()}, + {"num_rebalance_assignments", std::make_shared<DataTypeUInt64>()}, + {"is_currently_used", std::make_shared<DataTypeUInt8>()}, + {"rdkafka_stat", std::make_shared<DataTypeString>()}, + }; + return names_and_types; +} + +void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const +{ + auto tables_mark_dropped = DatabaseCatalog::instance().getTablesMarkedDropped(); + + size_t index = 0; + + + auto & database = assert_cast<ColumnString &>(*res_columns[index++]); + auto & table = assert_cast<ColumnString &>(*res_columns[index++]); + auto & consumer_id = assert_cast<ColumnString &>(*res_columns[index++]); //(number? or string? - single clickhouse table can have many consumers) + + auto & assigments_topics = assert_cast<ColumnString &>(assert_cast<ColumnArray &>(*res_columns[index]).getData()); + auto & assigments_topics_offsets = assert_cast<ColumnArray &>(*res_columns[index++]).getOffsets(); + + auto & assigments_partition_id = assert_cast<ColumnInt32 &>(assert_cast<ColumnArray &>(*res_columns[index]).getData()); + auto & assigments_partition_id_offsets = assert_cast<ColumnArray &>(*res_columns[index++]).getOffsets(); + + auto & assigments_current_offset = assert_cast<ColumnInt64 &>(assert_cast<ColumnArray &>(*res_columns[index]).getData()); + auto & assigments_current_offset_offsets = assert_cast<ColumnArray &>(*res_columns[index++]).getOffsets(); + + auto & exceptions_time = assert_cast<ColumnDateTime &>(assert_cast<ColumnArray &>(*res_columns[index]).getData()); + auto & exceptions_time_offset = assert_cast<ColumnArray &>(*res_columns[index++]).getOffsets(); + auto & exceptions_text = assert_cast<ColumnString &>(assert_cast<ColumnArray &>(*res_columns[index]).getData()); + auto & exceptions_text_offset = assert_cast<ColumnArray &>(*res_columns[index++]).getOffsets(); + auto & last_poll_time = assert_cast<ColumnDateTime &>(*res_columns[index++]); + auto & num_messages_read = assert_cast<ColumnUInt64 &>(*res_columns[index++]); + auto & last_commit_time = assert_cast<ColumnDateTime &>(*res_columns[index++]); + auto & num_commits = assert_cast<ColumnUInt64 &>(*res_columns[index++]); + auto & last_rebalance_time = assert_cast<ColumnDateTime &>(*res_columns[index++]); + auto & num_rebalance_revocations = assert_cast<ColumnUInt64 &>(*res_columns[index++]); + auto & num_rebalance_assigments = assert_cast<ColumnUInt64 &>(*res_columns[index++]); + auto & is_currently_used = assert_cast<ColumnUInt8 &>(*res_columns[index++]); + auto & rdkafka_stat = assert_cast<ColumnString &>(*res_columns[index++]); + + const auto access = context->getAccess(); + size_t last_assignment_num = 0; + size_t exceptions_num = 0; + + auto add_row = [&](const DatabaseTablesIteratorPtr & it, StorageKafka * storage_kafka_ptr) + { + if (!access->isGranted(AccessType::SHOW_TABLES, it->databaseName(), it->name())) + { + return; + } + + std::string database_str = it->databaseName(); + std::string table_str = it->name(); + + auto safe_consumers = storage_kafka_ptr->getSafeConsumers(); + + for (const auto & weak_consumer : safe_consumers.consumers) + { + if (auto consumer = weak_consumer.lock()) + { + auto consumer_stat = consumer->getStat(); + + database.insertData(database_str.data(), database_str.size()); + table.insertData(table_str.data(), table_str.size()); + + consumer_id.insertData(consumer_stat.consumer_id.data(), consumer_stat.consumer_id.size()); + + const auto num_assignnemts = consumer_stat.assignments.size(); + + for (size_t num = 0; num < num_assignnemts; ++num) + { + const auto & assign = consumer_stat.assignments[num]; + + assigments_topics.insertData(assign.topic_str.data(), assign.topic_str.size()); + + assigments_partition_id.insert(assign.partition_id); + assigments_current_offset.insert(assign.current_offset); + } + last_assignment_num += num_assignnemts; + + assigments_topics_offsets.push_back(last_assignment_num); + assigments_partition_id_offsets.push_back(last_assignment_num); + assigments_current_offset_offsets.push_back(last_assignment_num); + + for (const auto & exc : consumer_stat.exceptions_buffer) + { + exceptions_text.insertData(exc.text.data(), exc.text.size()); + exceptions_time.insert(exc.timestamp_usec); + } + exceptions_num += consumer_stat.exceptions_buffer.size(); + exceptions_text_offset.push_back(exceptions_num); + exceptions_time_offset.push_back(exceptions_num); + + + last_poll_time.insert(consumer_stat.last_poll_time); + num_messages_read.insert(consumer_stat.num_messages_read); + last_commit_time.insert(consumer_stat.last_commit_timestamp_usec); + num_commits.insert(consumer_stat.num_commits); + last_rebalance_time.insert(consumer_stat.last_rebalance_timestamp_usec); + + num_rebalance_revocations.insert(consumer_stat.num_rebalance_revocations); + num_rebalance_assigments.insert(consumer_stat.num_rebalance_assignments); + + is_currently_used.insert(consumer_stat.in_use); + + rdkafka_stat.insertData(consumer_stat.rdkafka_stat.data(), consumer_stat.rdkafka_stat.size()); + } + } + }; + + const bool show_tables_granted = access->isGranted(AccessType::SHOW_TABLES); + + if (show_tables_granted) + { + auto databases = DatabaseCatalog::instance().getDatabases(); + for (const auto & db : databases) + { + for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) + { + StoragePtr storage = iterator->table(); + if (auto * kafka_table = dynamic_cast<StorageKafka *>(storage.get())) + { + add_row(iterator, kafka_table); + } + } + } + + } +} + +} + +#endif |