aboutsummaryrefslogtreecommitdiffstats
path: root/library/go/maxprocs
diff options
context:
space:
mode:
authorqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
committerqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
commit22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch)
treebffa27765faf54126ad44bcafa89fadecb7a73d7 /library/go/maxprocs
parent332b99e2173f0425444abb759eebcb2fafaa9209 (diff)
downloadydb-22f8ae0e3f5d68b92aecccdf96c1d841a0334311.tar.gz
validate canons without yatest_common
Diffstat (limited to 'library/go/maxprocs')
-rw-r--r--library/go/maxprocs/cgroups.go174
-rw-r--r--library/go/maxprocs/doc.go9
-rw-r--r--library/go/maxprocs/helpers.go46
-rw-r--r--library/go/maxprocs/maxprocs.go159
4 files changed, 388 insertions, 0 deletions
diff --git a/library/go/maxprocs/cgroups.go b/library/go/maxprocs/cgroups.go
new file mode 100644
index 0000000000..0482788d37
--- /dev/null
+++ b/library/go/maxprocs/cgroups.go
@@ -0,0 +1,174 @@
+package maxprocs
+
+import (
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "path/filepath"
+ "runtime"
+ "strconv"
+ "strings"
+
+ "github.com/prometheus/procfs"
+
+ "a.yandex-team.ru/library/go/slices"
+)
+
+const (
+ unifiedHierarchy = "unified"
+ cpuHierarchy = "cpu"
+)
+
+var ErrNoCgroups = errors.New("no suitable cgroups were found")
+
+func isCgroupsExists() bool {
+ mounts, err := procfs.GetMounts()
+ if err != nil {
+ return false
+ }
+
+ for _, m := range mounts {
+ if m.FSType == "cgroup" || m.FSType == "cgroup2" {
+ return true
+ }
+ }
+
+ return false
+}
+
+func parseCgroupsMountPoints() (map[string]string, error) {
+ mounts, err := procfs.GetMounts()
+ if err != nil {
+ return nil, err
+ }
+
+ out := make(map[string]string)
+ for _, mount := range mounts {
+ switch mount.FSType {
+ case "cgroup2":
+ out[unifiedHierarchy] = mount.MountPoint
+ case "cgroup":
+ for opt := range mount.SuperOptions {
+ if opt == cpuHierarchy {
+ out[cpuHierarchy] = mount.MountPoint
+ break
+ }
+ }
+ }
+ }
+
+ return out, nil
+}
+
+func getCFSQuota() (float64, error) {
+ self, err := procfs.Self()
+ if err != nil {
+ return 0, err
+ }
+
+ selfCgroups, err := self.Cgroups()
+ if err != nil {
+ return 0, fmt.Errorf("parse self cgroups: %w", err)
+ }
+
+ cgroups, err := parseCgroupsMountPoints()
+ if err != nil {
+ return 0, fmt.Errorf("parse cgroups: %w", err)
+ }
+
+ if len(selfCgroups) == 0 || len(cgroups) == 0 {
+ return 0, ErrNoCgroups
+ }
+
+ for _, cgroup := range selfCgroups {
+ var quota float64
+ switch {
+ case cgroup.HierarchyID == 0:
+ // for the cgroups v2 hierarchy id is always 0
+ mp, ok := cgroups[unifiedHierarchy]
+ if !ok {
+ continue
+ }
+
+ quota, _ = parseV2CPUQuota(mp, cgroup.Path)
+ case slices.ContainsString(cgroup.Controllers, cpuHierarchy):
+ mp, ok := cgroups[cpuHierarchy]
+ if !ok {
+ continue
+ }
+
+ quota, _ = parseV1CPUQuota(mp, cgroup.Path)
+ }
+
+ if quota > 0 {
+ return quota, nil
+ }
+ }
+
+ return 0, ErrNoCgroups
+}
+
+func parseV1CPUQuota(mountPoint string, cgroupPath string) (float64, error) {
+ basePath := filepath.Join(mountPoint, cgroupPath)
+ cfsQuota, err := readFileInt(filepath.Join(basePath, "cpu.cfs_quota_us"))
+ if err != nil {
+ return -1, fmt.Errorf("parse cpu.cfs_quota_us: %w", err)
+ }
+
+ // A value of -1 for cpu.cfs_quota_us indicates that the group does not have any
+ // bandwidth restriction in place
+ // https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt
+ if cfsQuota == -1 {
+ return float64(runtime.NumCPU()), nil
+ }
+
+ cfsPeriod, err := readFileInt(filepath.Join(basePath, "cpu.cfs_period_us"))
+ if err != nil {
+ return -1, fmt.Errorf("parse cpu.cfs_period_us: %w", err)
+ }
+
+ return float64(cfsQuota) / float64(cfsPeriod), nil
+}
+
+func parseV2CPUQuota(mountPoint string, cgroupPath string) (float64, error) {
+ /*
+ https://www.kernel.org/doc/Documentation/cgroup-v2.txt
+
+ cpu.max
+ A read-write two value file which exists on non-root cgroups.
+ The default is "max 100000".
+
+ The maximum bandwidth limit. It's in the following format::
+ $MAX $PERIOD
+
+ which indicates that the group may consume upto $MAX in each
+ $PERIOD duration. "max" for $MAX indicates no limit. If only
+ one number is written, $MAX is updated.
+ */
+ rawCPUMax, err := ioutil.ReadFile(filepath.Join(mountPoint, cgroupPath, "cpu.max"))
+ if err != nil {
+ return -1, fmt.Errorf("read cpu.max: %w", err)
+ }
+
+ parts := strings.Fields(string(rawCPUMax))
+ if len(parts) != 2 {
+ return -1, fmt.Errorf("invalid cpu.max format: %s", string(rawCPUMax))
+ }
+
+ // "max" for $MAX indicates no limit
+ if parts[0] == "max" {
+ return float64(runtime.NumCPU()), nil
+ }
+
+ cpuMax, err := strconv.Atoi(parts[0])
+ if err != nil {
+ return -1, fmt.Errorf("parse cpu.max[max] (%q): %w", parts[0], err)
+ }
+
+ cpuPeriod, err := strconv.Atoi(parts[1])
+ if err != nil {
+ return -1, fmt.Errorf("parse cpu.max[period] (%q): %w", parts[1], err)
+ }
+
+ return float64(cpuMax) / float64(cpuPeriod), nil
+}
diff --git a/library/go/maxprocs/doc.go b/library/go/maxprocs/doc.go
new file mode 100644
index 0000000000..2461d6022c
--- /dev/null
+++ b/library/go/maxprocs/doc.go
@@ -0,0 +1,9 @@
+// Automatically sets GOMAXPROCS to match Yandex clouds container CPU quota.
+//
+// This package always adjust GOMAXPROCS to some "safe" value.
+// "safe" values are:
+// - 2 or more
+// - no more than logical cores
+// - no moore than container guarantees
+// - no more than 8
+package maxprocs
diff --git a/library/go/maxprocs/helpers.go b/library/go/maxprocs/helpers.go
new file mode 100644
index 0000000000..70263e6eb3
--- /dev/null
+++ b/library/go/maxprocs/helpers.go
@@ -0,0 +1,46 @@
+package maxprocs
+
+import (
+ "bytes"
+ "io/ioutil"
+ "math"
+ "os"
+ "strconv"
+)
+
+func getEnv(envName string) (string, bool) {
+ val, ok := os.LookupEnv(envName)
+ return val, ok && val != ""
+}
+
+func applyIntStringLimit(val string) int {
+ maxProc, err := strconv.Atoi(val)
+ if err == nil {
+ return Adjust(maxProc)
+ }
+
+ return Adjust(SafeProc)
+}
+
+func applyFloatStringLimit(val string) int {
+ maxProc, err := strconv.ParseFloat(val, 64)
+ if err != nil {
+ return Adjust(SafeProc)
+ }
+
+ return applyFloatLimit(maxProc)
+}
+
+func applyFloatLimit(val float64) int {
+ maxProc := int(math.Floor(val))
+ return Adjust(maxProc)
+}
+
+func readFileInt(filename string) (int, error) {
+ raw, err := ioutil.ReadFile(filename)
+ if err != nil {
+ return 0, err
+ }
+
+ return strconv.Atoi(string(bytes.TrimSpace(raw)))
+}
diff --git a/library/go/maxprocs/maxprocs.go b/library/go/maxprocs/maxprocs.go
new file mode 100644
index 0000000000..b5996ec6bc
--- /dev/null
+++ b/library/go/maxprocs/maxprocs.go
@@ -0,0 +1,159 @@
+package maxprocs
+
+import (
+ "context"
+ "os"
+ "runtime"
+ "strings"
+
+ "a.yandex-team.ru/library/go/yandex/deploy/podagent"
+ "a.yandex-team.ru/library/go/yandex/yplite"
+)
+
+const (
+ SafeProc = 4
+ MinProc = 2
+ MaxProc = 8
+
+ GoMaxProcEnvName = "GOMAXPROCS"
+ QloudCPUEnvName = "QLOUD_CPU_GUARANTEE"
+ InstancectlCPUEnvName = "CPU_GUARANTEE"
+ DeloyBoxIDName = podagent.EnvBoxIDKey
+)
+
+// Adjust adjust the maximum number of CPUs that can be executing.
+// Takes a minimum between n and CPU counts and returns the previous setting
+func Adjust(n int) int {
+ if n < MinProc {
+ n = MinProc
+ }
+
+ nCPU := runtime.NumCPU()
+ if n < nCPU {
+ return runtime.GOMAXPROCS(n)
+ }
+
+ return runtime.GOMAXPROCS(nCPU)
+}
+
+// AdjustAuto automatically adjust the maximum number of CPUs that can be executing to safe value
+// and returns the previous setting
+func AdjustAuto() int {
+ if val, ok := getEnv(GoMaxProcEnvName); ok {
+ return applyIntStringLimit(val)
+ }
+
+ if isCgroupsExists() {
+ return AdjustCgroup()
+ }
+
+ if val, ok := getEnv(InstancectlCPUEnvName); ok {
+ return applyFloatStringLimit(strings.TrimRight(val, "c"))
+ }
+
+ if val, ok := getEnv(QloudCPUEnvName); ok {
+ return applyFloatStringLimit(val)
+ }
+
+ if boxID, ok := os.LookupEnv(DeloyBoxIDName); ok {
+ return adjustYPBox(boxID)
+ }
+
+ if yplite.IsAPIAvailable() {
+ return AdjustYPLite()
+ }
+
+ return Adjust(SafeProc)
+}
+
+// AdjustQloud automatically adjust the maximum number of CPUs in case of Qloud env
+// and returns the previous setting
+func AdjustQloud() int {
+ if val, ok := getEnv(GoMaxProcEnvName); ok {
+ return applyIntStringLimit(val)
+ }
+
+ if val, ok := getEnv(QloudCPUEnvName); ok {
+ return applyFloatStringLimit(val)
+ }
+
+ return Adjust(MaxProc)
+}
+
+// AdjustYP automatically adjust the maximum number of CPUs in case of YP/Y.Deploy/YP.Hard env
+// and returns the previous setting
+func AdjustYP() int {
+ if val, ok := getEnv(GoMaxProcEnvName); ok {
+ return applyIntStringLimit(val)
+ }
+
+ if isCgroupsExists() {
+ return AdjustCgroup()
+ }
+
+ return adjustYPBox(os.Getenv(DeloyBoxIDName))
+}
+
+func adjustYPBox(boxID string) int {
+ resources, err := podagent.NewClient().PodAttributes(context.Background())
+ if err != nil {
+ return Adjust(SafeProc)
+ }
+
+ var cpuGuarantee float64
+ if boxResources, ok := resources.BoxesRequirements[boxID]; ok {
+ cpuGuarantee = boxResources.CPU.Guarantee / 1000
+ }
+
+ if cpuGuarantee <= 0 {
+ // if we don't have guarantees for current box, let's use pod guarantees
+ cpuGuarantee = resources.PodRequirements.CPU.Guarantee / 1000
+ }
+
+ return applyFloatLimit(cpuGuarantee)
+}
+
+// AdjustYPLite automatically adjust the maximum number of CPUs in case of YP.Lite env
+// and returns the previous setting
+func AdjustYPLite() int {
+ if val, ok := getEnv(GoMaxProcEnvName); ok {
+ return applyIntStringLimit(val)
+ }
+
+ podAttributes, err := yplite.FetchPodAttributes()
+ if err != nil {
+ return Adjust(SafeProc)
+ }
+
+ return applyFloatLimit(float64(podAttributes.ResourceRequirements.CPU.Guarantee / 1000))
+}
+
+// AdjustInstancectl automatically adjust the maximum number of CPUs
+// and returns the previous setting
+// WARNING: supported only instancectl v1.177+ (https://wiki.yandex-team.ru/runtime-cloud/nanny/instancectl-change-log/#1.177)
+func AdjustInstancectl() int {
+ if val, ok := getEnv(GoMaxProcEnvName); ok {
+ return applyIntStringLimit(val)
+ }
+
+ if val, ok := getEnv(InstancectlCPUEnvName); ok {
+ return applyFloatStringLimit(strings.TrimRight(val, "c"))
+ }
+
+ return Adjust(MaxProc)
+}
+
+// AdjustCgroup automatically adjust the maximum number of CPUs based on the CFS quota
+// and returns the previous setting.
+func AdjustCgroup() int {
+ if val, ok := getEnv(GoMaxProcEnvName); ok {
+ return applyIntStringLimit(val)
+ }
+
+ quota, err := getCFSQuota()
+ if err != nil {
+ return Adjust(SafeProc)
+ }
+
+ return applyFloatLimit(quota)
+}