diff options
author | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-06-30 19:03:49 +0300 |
---|---|---|
committer | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-06-30 19:03:49 +0300 |
commit | fd58b11067041cacf6feca44fe47f46bc01422dc (patch) | |
tree | b43874aa28eb9adb801623260afe5e89fc3e8646 | |
parent | 81fd9ea2d075516cb0725f85118dcca474cd42e3 (diff) | |
download | ydb-fd58b11067041cacf6feca44fe47f46bc01422dc.tar.gz |
DC awareness
ref:cf26d330761ea0ba13530ba17766b91269d2ab35
-rw-r--r-- | library/cpp/actors/core/interconnect.cpp | 7 | ||||
-rw-r--r-- | library/cpp/actors/core/interconnect.h | 2 | ||||
-rw-r--r-- | ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp | 6 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/nodes_manager.cpp | 44 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/nodes_manager.h | 1 | ||||
-rw-r--r-- | ydb/core/yq/libs/config/protos/nodes_manager.proto | 1 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp | 13 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/schema.h | 1 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp | 1 | ||||
-rw-r--r-- | ydb/core/yq/libs/init/init.cpp | 1 | ||||
-rw-r--r-- | ydb/core/yq/libs/protos/yq_private.proto | 1 | ||||
-rw-r--r-- | ydb/tests/library/harness/kikimr_config.py | 2 | ||||
-rw-r--r-- | ydb/tests/library/harness/kikimr_runner.py | 13 |
13 files changed, 77 insertions, 16 deletions
diff --git a/library/cpp/actors/core/interconnect.cpp b/library/cpp/actors/core/interconnect.cpp index 9b377cbb00..192fb68769 100644 --- a/library/cpp/actors/core/interconnect.cpp +++ b/library/cpp/actors/core/interconnect.cpp @@ -76,6 +76,13 @@ namespace NActors { : TNodeLocation(ParseLocation(s)) {} + TNodeLocation::TNodeLocation(const TString& DataCenter, const TString& Module, const TString& Rack, const TString& Unit) { + if (DataCenter) Items.emplace_back(TKeys::DataCenter, DataCenter); + if (Module) Items.emplace_back(TKeys::Module, Module); + if (Rack) Items.emplace_back(TKeys::Rack, Rack); + if (Unit) Items.emplace_back(TKeys::Unit, Unit); + } + NActorsInterconnect::TNodeLocation TNodeLocation::ParseLocation(const TString& s) { NActorsInterconnect::TNodeLocation res; const bool success = res.ParseFromString(s); diff --git a/library/cpp/actors/core/interconnect.h b/library/cpp/actors/core/interconnect.h index 5ecac1deec..37fd7bff4d 100644 --- a/library/cpp/actors/core/interconnect.h +++ b/library/cpp/actors/core/interconnect.h @@ -57,6 +57,7 @@ namespace NActors { TNodeLocation() = default; TNodeLocation(const TNodeLocation&) = default; TNodeLocation(TNodeLocation&&) = default; + TNodeLocation(const TString& DataCenter, const TString& Module = "", const TString& Rack = "", const TString& Unit = ""); // protobuf-parser ctor explicit TNodeLocation(const NActorsInterconnect::TNodeLocation& location); @@ -79,6 +80,7 @@ namespace NActors { TString GetDataCenterId() const { return ToStringUpTo(TKeys::DataCenter); } TString GetModuleId() const { return ToStringUpTo(TKeys::Module); } TString GetRackId() const { return ToStringUpTo(TKeys::Rack); } + TString GetUnitId() const { return ToStringUpTo(TKeys::Unit); } TString ToString() const { return ToStringUpTo(TKeys::E(Max<int>())); } TString ToStringUpTo(TKeys::E upToKey) const; diff --git a/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp b/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp index bddc8b499b..41b39def48 100644 --- a/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp +++ b/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp @@ -643,6 +643,12 @@ protected: } } + if (config.ParseResult->Has("data-center")) { + if (AppConfig.HasYandexQueryConfig()) { + AppConfig.MutableYandexQueryConfig()->MutableNodesManager()->SetDataCenter(to_lower(DataCenter)); + } + } + // MessageBus options. if (!AppConfig.HasMessageBusConfig()) { diff --git a/ydb/core/yq/libs/actors/nodes_manager.cpp b/ydb/core/yq/libs/actors/nodes_manager.cpp index b8bbe99f75..0e930e518d 100644 --- a/ydb/core/yq/libs/actors/nodes_manager.cpp +++ b/ydb/core/yq/libs/actors/nodes_manager.cpp @@ -42,6 +42,7 @@ public: const ::NYql::NCommon::TServiceCounters& serviceCounters, const NConfig::TPrivateApiConfig& privateApiConfig, const ui32& icPort, + const TString& dataCenter, const TString& tenant, ui64 mkqlInitialMemoryLimit) : WorkerManagerCounters(workerManagerCounters) @@ -53,6 +54,7 @@ public: , MkqlInitialMemoryLimit(mkqlInitialMemoryLimit) , YqSharedResources(yqSharedResources) , IcPort(icPort) + , DataCenter(dataCenter) , InternalServiceId(MakeInternalServiceActorId()) { @@ -96,32 +98,39 @@ private: ui64 memoryAllocated = AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic()); TVector<TPeer> nodes; for (ui32 i = 0; i < count; ++i) { - TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0}; + TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0, DataCenter}; bool selfPlacement = true; if (!Peers.empty()) { auto FirstPeer = NextPeer; while (true) { - if (NextPeer >= Peers.size()) { + auto& nextNode = Peers[NextPeer]; + + if (++NextPeer >= Peers.size()) { NextPeer = 0; } - auto& nextNode = Peers[NextPeer]; - ++NextPeer; - - if (NextPeer == FirstPeer // we closed loop w/o success, fallback to round robin then - || nextNode.MemoryLimit == 0 // not limit defined for the node - || nextNode.MemoryLimit > nextNode.MemoryAllocated + MkqlInitialMemoryLimit // memory is enough + if ( (DataCenter.empty() || nextNode.DataCenter.empty() || DataCenter == nextNode.DataCenter) // non empty DC must match + && ( nextNode.MemoryLimit == 0 // memory is NOT limited + || nextNode.MemoryLimit >= nextNode.MemoryAllocated + MkqlInitialMemoryLimit) // or enough ) { // adjust allocated size to place next tasks correctly, will be reset after next health check nextNode.MemoryAllocated += MkqlInitialMemoryLimit; + if (nextNode.NodeId == SelfId().NodeId()) { + // eventually synced self allocation info + memoryAllocated += MkqlInitialMemoryLimit; + } node = nextNode; selfPlacement = false; break; } + + if (NextPeer == FirstPeer) { // we closed loop w/o success, fallback to self placement then + break; + } } } if (selfPlacement) { - if (memoryLimit == 0 || memoryLimit > memoryAllocated + MkqlInitialMemoryLimit) { + if (memoryLimit == 0 || memoryLimit >= memoryAllocated + MkqlInitialMemoryLimit) { memoryAllocated += MkqlInitialMemoryLimit; } else { placementFailure = true; @@ -190,6 +199,7 @@ private: node.set_memory_limit(AtomicGet(WorkerManagerCounters.MkqlMemoryLimit->GetAtomic())); node.set_memory_allocated(AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic())); node.set_interconnect_port(IcPort); + node.set_data_center(DataCenter); Send(InternalServiceId, new TEvInternalService::TEvHealthCheckRequest(request)); } @@ -211,9 +221,16 @@ private: nodesInfo.reserve(res.nodes().size()); Peers.clear(); + std::set<ui32> nodeIds; // may be not unique for (const auto& node : res.nodes()) { + + if (nodeIds.contains(node.node_id())) { + continue; + } + nodeIds.insert(node.node_id()); + Peers.push_back({node.node_id(), node.instance_id() + "," + node.hostname(), - node.active_workers(), node.memory_limit(), node.memory_allocated()}); + node.active_workers(), node.memory_limit(), node.memory_allocated(), node.data_center()}); if (node.interconnect_port()) { nodesInfo.emplace_back(TEvInterconnect::TNodeInfo{ @@ -222,7 +239,7 @@ private: node.hostname(), // host node.hostname(), // resolveHost static_cast<ui16>(node.interconnect_port()), - /* NodeLocation = */{}}); + TNodeLocation(node.data_center())}); } } @@ -251,6 +268,7 @@ private: NYq::TYqSharedResources::TPtr YqSharedResources; const ui32 IcPort; // Interconnect Port + TString DataCenter; struct TPeer { ui32 NodeId; @@ -258,6 +276,7 @@ private: ui64 ActiveWorkers; ui64 MemoryLimit; ui64 MemoryAllocated; + TString DataCenter; }; TVector<TPeer> Peers; ui32 ResourceIdPart = 0; @@ -279,11 +298,12 @@ IActor* CreateNodesManager( const NConfig::TPrivateApiConfig& privateApiConfig, const NYq::TYqSharedResources::TPtr& yqSharedResources, const ui32& icPort, + const TString& dataCenter, const TString& tenant, ui64 mkqlInitialMemoryLimit) { return new TNodesManagerActor(yqSharedResources, workerManagerCounters, timeProvider, randomProvider, - serviceCounters, privateApiConfig, icPort, tenant, mkqlInitialMemoryLimit); + serviceCounters, privateApiConfig, icPort, dataCenter, tenant, mkqlInitialMemoryLimit); } } // namespace NYq diff --git a/ydb/core/yq/libs/actors/nodes_manager.h b/ydb/core/yq/libs/actors/nodes_manager.h index 111181c2b8..4a9b47f8b9 100644 --- a/ydb/core/yq/libs/actors/nodes_manager.h +++ b/ydb/core/yq/libs/actors/nodes_manager.h @@ -28,6 +28,7 @@ IActor* CreateNodesManager( const NConfig::TPrivateApiConfig& privateApiConfig, const NYq::TYqSharedResources::TPtr& yqSharedResources, const ui32& icPort, + const TString& dataCenter = "", const TString& tenant = "", ui64 mkqlInitialMemoryLimit = 0); diff --git a/ydb/core/yq/libs/config/protos/nodes_manager.proto b/ydb/core/yq/libs/config/protos/nodes_manager.proto index cfc1073cae..5b390d3ba0 100644 --- a/ydb/core/yq/libs/config/protos/nodes_manager.proto +++ b/ydb/core/yq/libs/config/protos/nodes_manager.proto @@ -10,4 +10,5 @@ message TNodesManagerConfig { bool Enabled = 1; uint32 Port = 2; string Host = 3; + string DataCenter = 4; } diff --git a/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp b/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp index 733828438d..5fd2f1fdf3 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp @@ -22,6 +22,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth const ui64 memoryLimit = node.memory_limit(); const ui64 memoryAllocated = node.memory_allocated(); const ui32 icPort = node.interconnect_port(); + const TString dataCenter = node.data_center(); const TString nodeAddress = node.node_address(); const auto ttl = TDuration::Seconds(5); const auto deadline = startTime + ttl * 3; @@ -47,6 +48,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth node->set_memory_limit(memoryLimit); node->set_memory_allocated(memoryAllocated); node->set_node_address(nodeAddress); + node->set_data_center(dataCenter); } TSqlQueryBuilder readQueryBuilder(YdbConnection->TablePathPrefix, "NodesHealthCheck(read)"); @@ -54,7 +56,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth readQueryBuilder.AddString("tenant", tenant); readQueryBuilder.AddText( "SELECT `" NODE_ID_COLUMN_NAME "`, `" INSTANCE_ID_COLUMN_NAME "`, `" HOST_NAME_COLUMN_NAME "`, `" ACTIVE_WORKERS_COLUMN_NAME"`, `" MEMORY_LIMIT_COLUMN_NAME"`, " - "`" MEMORY_ALLOCATED_COLUMN_NAME"`, `" INTERCONNECT_PORT_COLUMN_NAME "`, `" NODE_ADDRESS_COLUMN_NAME "` FROM `" NODES_TABLE_NAME "`\n" + "`" MEMORY_ALLOCATED_COLUMN_NAME"`, `" INTERCONNECT_PORT_COLUMN_NAME "`, `" NODE_ADDRESS_COLUMN_NAME"`, `" DATA_CENTER_COLUMN_NAME "` FROM `" NODES_TABLE_NAME "`\n" "WHERE `" TENANT_COLUMN_NAME"` = $tenant AND `" EXPIRE_AT_COLUMN_NAME "` >= $now;\n" ); @@ -73,6 +75,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth node->set_memory_allocated(*parser.ColumnParser(MEMORY_ALLOCATED_COLUMN_NAME).GetOptionalUint64()); node->set_interconnect_port(parser.ColumnParser(INTERCONNECT_PORT_COLUMN_NAME).GetOptionalUint32().GetOrElse(0)); node->set_node_address(*parser.ColumnParser(NODE_ADDRESS_COLUMN_NAME).GetOptionalString()); + node->set_data_center(*parser.ColumnParser(DATA_CENTER_COLUMN_NAME).GetOptionalString()); } } } @@ -88,11 +91,15 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth writeQueryBuilder.AddUint64("memory_allocated", memoryAllocated); writeQueryBuilder.AddUint32("ic_port", icPort); writeQueryBuilder.AddString("node_address", nodeAddress); + writeQueryBuilder.AddString("data_center", dataCenter); writeQueryBuilder.AddText( "UPSERT INTO `" NODES_TABLE_NAME "`\n" "(`" TENANT_COLUMN_NAME "`, `" NODE_ID_COLUMN_NAME "`, `" INSTANCE_ID_COLUMN_NAME "`,\n" - "`" HOST_NAME_COLUMN_NAME "`, `" EXPIRE_AT_COLUMN_NAME "`, `" ACTIVE_WORKERS_COLUMN_NAME"`, `" MEMORY_LIMIT_COLUMN_NAME"`, `" MEMORY_ALLOCATED_COLUMN_NAME "`, `" INTERCONNECT_PORT_COLUMN_NAME "`, `" NODE_ADDRESS_COLUMN_NAME "`)\n" - "VALUES ($tenant ,$node_id, $instance_id, $hostname, $deadline, $active_workers, $memory_limit, $memory_allocated, $ic_port, $node_address);\n" + "`" HOST_NAME_COLUMN_NAME "`, `" EXPIRE_AT_COLUMN_NAME "`, `" ACTIVE_WORKERS_COLUMN_NAME"`,\n" + "`" MEMORY_LIMIT_COLUMN_NAME "`, `" MEMORY_ALLOCATED_COLUMN_NAME "`, `" INTERCONNECT_PORT_COLUMN_NAME "`,\n" + "`" NODE_ADDRESS_COLUMN_NAME "`, `" DATA_CENTER_COLUMN_NAME"`)\n" + "VALUES ($tenant ,$node_id, $instance_id, $hostname, $deadline, $active_workers, $memory_limit,\n" + "$memory_allocated, $ic_port, $node_address, $data_center);\n" ); const auto writeQuery = writeQueryBuilder.Build(); return std::make_pair(writeQuery.Sql, writeQuery.Params); diff --git a/ydb/core/yq/libs/control_plane_storage/schema.h b/ydb/core/yq/libs/control_plane_storage/schema.h index f4f2fea21c..344be98e12 100644 --- a/ydb/core/yq/libs/control_plane_storage/schema.h +++ b/ydb/core/yq/libs/control_plane_storage/schema.h @@ -63,6 +63,7 @@ namespace NYq { #define MEMORY_ALLOCATED_COLUMN_NAME "memory_allocated" #define INTERCONNECT_PORT_COLUMN_NAME "interconnect_port" #define NODE_ADDRESS_COLUMN_NAME "node_address" +#define DATA_CENTER_COLUMN_NAME "data_center" #define HOST_NAME_COLUMN_NAME "hostname" #define OWNER_COLUMN_NAME "owner" diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp index da842c919a..72189f65f3 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp @@ -169,6 +169,7 @@ void TYdbControlPlaneStorageActor::CreateNodesTable() .AddNullableColumn(EXPIRE_AT_COLUMN_NAME, EPrimitiveType::Timestamp) .AddNullableColumn(INTERCONNECT_PORT_COLUMN_NAME, EPrimitiveType::Uint32) .AddNullableColumn(NODE_ADDRESS_COLUMN_NAME, EPrimitiveType::String) + .AddNullableColumn(DATA_CENTER_COLUMN_NAME, EPrimitiveType::String) .SetTtlSettings(EXPIRE_AT_COLUMN_NAME) .SetPrimaryKeyColumns({TENANT_COLUMN_NAME, NODE_ID_COLUMN_NAME}) .Build(); diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index 4826f78de9..15a0524d12 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -201,6 +201,7 @@ void Init( protoConfig.GetPrivateApi(), yqSharedResources, icPort, + protoConfig.GetNodesManager().GetDataCenter(), tenant, mkqlInitialMemoryLimit); diff --git a/ydb/core/yq/libs/protos/yq_private.proto b/ydb/core/yq/libs/protos/yq_private.proto index 0af8d46c22..a67c0db9ab 100644 --- a/ydb/core/yq/libs/protos/yq_private.proto +++ b/ydb/core/yq/libs/protos/yq_private.proto @@ -150,6 +150,7 @@ message NodeInfo { uint64 memory_allocated = 6; uint32 interconnect_port = 7; string node_address = 8; + string data_center = 9; } message NodesHealthCheckRequest { diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py index 12ff9e154f..02f92a9c44 100644 --- a/ydb/tests/library/harness/kikimr_config.py +++ b/ydb/tests/library/harness/kikimr_config.py @@ -136,6 +136,7 @@ class KikimrConfigGenerator(object): bs_cache_file_path=None, yq_tenant=None, use_legacy_pq=False, + dc_mapping={}, ): self._version = version self.use_log_files = use_log_files @@ -197,6 +198,7 @@ class KikimrConfigGenerator(object): self.__output_path = output_path or yatest_common.output_path() self.node_kind = node_kind self.yq_tenant = yq_tenant + self.dc_mapping = dc_mapping self.__bs_cache_file_path = bs_cache_file_path diff --git a/ydb/tests/library/harness/kikimr_runner.py b/ydb/tests/library/harness/kikimr_runner.py index fcb15f49cd..d7c43575de 100644 --- a/ydb/tests/library/harness/kikimr_runner.py +++ b/ydb/tests/library/harness/kikimr_runner.py @@ -48,10 +48,11 @@ def join(a, b): class KiKiMRNode(daemon.Daemon, kikimr_node_interface.NodeInterface): def __init__(self, node_idx, config_path, port_allocator, cluster_name, configurator, udfs_dir=None, role='node', node_broker_port=None, tenant_affiliation=None, encryption_key=None, - binary_path=None): + binary_path=None, data_center=None): super(kikimr_node_interface.NodeInterface, self).__init__() self.node_id = node_idx + self.data_center = data_center self.__cwd = None self.__config_path = config_path self.__cluster_name = cluster_name @@ -180,6 +181,11 @@ class KiKiMRNode(daemon.Daemon, kikimr_node_interface.NodeInterface): if self.sqs_port is not None: command.extend(["--sqs-port=%d" % self.sqs_port]) + if self.data_center is not None: + command.append( + "--data-center=%s" % self.data_center + ) + logger.info('CFG_DIR_PATH="%s"', self.__config_path) logger.info("Final command: %s", ' '.join(command).replace(self.__config_path, '$CFG_DIR_PATH')) return command @@ -357,6 +363,10 @@ class KiKiMR(kikimr_cluster_interface.KiKiMRClusterInterface): def __register_node(self): node_index = next(self._node_index_allocator) + data_center = None + if isinstance(self.__configurator.dc_mapping, dict): + if node_index in self.__configurator.dc_mapping: + data_center = self.__configurator.dc_mapping[node_index] self._nodes[node_index] = KiKiMRNode( node_index, self.config_path, @@ -365,6 +375,7 @@ class KiKiMR(kikimr_cluster_interface.KiKiMRClusterInterface): configurator=self.__configurator, udfs_dir=self.__common_udfs_dir, tenant_affiliation=self.__configurator.yq_tenant, + data_center=data_center, ) return self._nodes[node_index] |