diff options
author | qrort <qrort@yandex-team.com> | 2022-11-30 23:47:12 +0300 |
---|---|---|
committer | qrort <qrort@yandex-team.com> | 2022-11-30 23:47:12 +0300 |
commit | 22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch) | |
tree | bffa27765faf54126ad44bcafa89fadecb7a73d7 /library/go/maxprocs | |
parent | 332b99e2173f0425444abb759eebcb2fafaa9209 (diff) | |
download | ydb-22f8ae0e3f5d68b92aecccdf96c1d841a0334311.tar.gz |
validate canons without yatest_common
Diffstat (limited to 'library/go/maxprocs')
-rw-r--r-- | library/go/maxprocs/cgroups.go | 174 | ||||
-rw-r--r-- | library/go/maxprocs/doc.go | 9 | ||||
-rw-r--r-- | library/go/maxprocs/helpers.go | 46 | ||||
-rw-r--r-- | library/go/maxprocs/maxprocs.go | 159 |
4 files changed, 388 insertions, 0 deletions
diff --git a/library/go/maxprocs/cgroups.go b/library/go/maxprocs/cgroups.go new file mode 100644 index 0000000000..0482788d37 --- /dev/null +++ b/library/go/maxprocs/cgroups.go @@ -0,0 +1,174 @@ +package maxprocs + +import ( + "errors" + "fmt" + "io/ioutil" + "path/filepath" + "runtime" + "strconv" + "strings" + + "github.com/prometheus/procfs" + + "a.yandex-team.ru/library/go/slices" +) + +const ( + unifiedHierarchy = "unified" + cpuHierarchy = "cpu" +) + +var ErrNoCgroups = errors.New("no suitable cgroups were found") + +func isCgroupsExists() bool { + mounts, err := procfs.GetMounts() + if err != nil { + return false + } + + for _, m := range mounts { + if m.FSType == "cgroup" || m.FSType == "cgroup2" { + return true + } + } + + return false +} + +func parseCgroupsMountPoints() (map[string]string, error) { + mounts, err := procfs.GetMounts() + if err != nil { + return nil, err + } + + out := make(map[string]string) + for _, mount := range mounts { + switch mount.FSType { + case "cgroup2": + out[unifiedHierarchy] = mount.MountPoint + case "cgroup": + for opt := range mount.SuperOptions { + if opt == cpuHierarchy { + out[cpuHierarchy] = mount.MountPoint + break + } + } + } + } + + return out, nil +} + +func getCFSQuota() (float64, error) { + self, err := procfs.Self() + if err != nil { + return 0, err + } + + selfCgroups, err := self.Cgroups() + if err != nil { + return 0, fmt.Errorf("parse self cgroups: %w", err) + } + + cgroups, err := parseCgroupsMountPoints() + if err != nil { + return 0, fmt.Errorf("parse cgroups: %w", err) + } + + if len(selfCgroups) == 0 || len(cgroups) == 0 { + return 0, ErrNoCgroups + } + + for _, cgroup := range selfCgroups { + var quota float64 + switch { + case cgroup.HierarchyID == 0: + // for the cgroups v2 hierarchy id is always 0 + mp, ok := cgroups[unifiedHierarchy] + if !ok { + continue + } + + quota, _ = parseV2CPUQuota(mp, cgroup.Path) + case slices.ContainsString(cgroup.Controllers, cpuHierarchy): + mp, ok := cgroups[cpuHierarchy] + if !ok { + continue + } + + quota, _ = parseV1CPUQuota(mp, cgroup.Path) + } + + if quota > 0 { + return quota, nil + } + } + + return 0, ErrNoCgroups +} + +func parseV1CPUQuota(mountPoint string, cgroupPath string) (float64, error) { + basePath := filepath.Join(mountPoint, cgroupPath) + cfsQuota, err := readFileInt(filepath.Join(basePath, "cpu.cfs_quota_us")) + if err != nil { + return -1, fmt.Errorf("parse cpu.cfs_quota_us: %w", err) + } + + // A value of -1 for cpu.cfs_quota_us indicates that the group does not have any + // bandwidth restriction in place + // https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt + if cfsQuota == -1 { + return float64(runtime.NumCPU()), nil + } + + cfsPeriod, err := readFileInt(filepath.Join(basePath, "cpu.cfs_period_us")) + if err != nil { + return -1, fmt.Errorf("parse cpu.cfs_period_us: %w", err) + } + + return float64(cfsQuota) / float64(cfsPeriod), nil +} + +func parseV2CPUQuota(mountPoint string, cgroupPath string) (float64, error) { + /* + https://www.kernel.org/doc/Documentation/cgroup-v2.txt + + cpu.max + A read-write two value file which exists on non-root cgroups. + The default is "max 100000". + + The maximum bandwidth limit. It's in the following format:: + $MAX $PERIOD + + which indicates that the group may consume upto $MAX in each + $PERIOD duration. "max" for $MAX indicates no limit. If only + one number is written, $MAX is updated. + */ + rawCPUMax, err := ioutil.ReadFile(filepath.Join(mountPoint, cgroupPath, "cpu.max")) + if err != nil { + return -1, fmt.Errorf("read cpu.max: %w", err) + } + + parts := strings.Fields(string(rawCPUMax)) + if len(parts) != 2 { + return -1, fmt.Errorf("invalid cpu.max format: %s", string(rawCPUMax)) + } + + // "max" for $MAX indicates no limit + if parts[0] == "max" { + return float64(runtime.NumCPU()), nil + } + + cpuMax, err := strconv.Atoi(parts[0]) + if err != nil { + return -1, fmt.Errorf("parse cpu.max[max] (%q): %w", parts[0], err) + } + + cpuPeriod, err := strconv.Atoi(parts[1]) + if err != nil { + return -1, fmt.Errorf("parse cpu.max[period] (%q): %w", parts[1], err) + } + + return float64(cpuMax) / float64(cpuPeriod), nil +} diff --git a/library/go/maxprocs/doc.go b/library/go/maxprocs/doc.go new file mode 100644 index 0000000000..2461d6022c --- /dev/null +++ b/library/go/maxprocs/doc.go @@ -0,0 +1,9 @@ +// Automatically sets GOMAXPROCS to match Yandex clouds container CPU quota. +// +// This package always adjust GOMAXPROCS to some "safe" value. +// "safe" values are: +// - 2 or more +// - no more than logical cores +// - no moore than container guarantees +// - no more than 8 +package maxprocs diff --git a/library/go/maxprocs/helpers.go b/library/go/maxprocs/helpers.go new file mode 100644 index 0000000000..70263e6eb3 --- /dev/null +++ b/library/go/maxprocs/helpers.go @@ -0,0 +1,46 @@ +package maxprocs + +import ( + "bytes" + "io/ioutil" + "math" + "os" + "strconv" +) + +func getEnv(envName string) (string, bool) { + val, ok := os.LookupEnv(envName) + return val, ok && val != "" +} + +func applyIntStringLimit(val string) int { + maxProc, err := strconv.Atoi(val) + if err == nil { + return Adjust(maxProc) + } + + return Adjust(SafeProc) +} + +func applyFloatStringLimit(val string) int { + maxProc, err := strconv.ParseFloat(val, 64) + if err != nil { + return Adjust(SafeProc) + } + + return applyFloatLimit(maxProc) +} + +func applyFloatLimit(val float64) int { + maxProc := int(math.Floor(val)) + return Adjust(maxProc) +} + +func readFileInt(filename string) (int, error) { + raw, err := ioutil.ReadFile(filename) + if err != nil { + return 0, err + } + + return strconv.Atoi(string(bytes.TrimSpace(raw))) +} diff --git a/library/go/maxprocs/maxprocs.go b/library/go/maxprocs/maxprocs.go new file mode 100644 index 0000000000..b5996ec6bc --- /dev/null +++ b/library/go/maxprocs/maxprocs.go @@ -0,0 +1,159 @@ +package maxprocs + +import ( + "context" + "os" + "runtime" + "strings" + + "a.yandex-team.ru/library/go/yandex/deploy/podagent" + "a.yandex-team.ru/library/go/yandex/yplite" +) + +const ( + SafeProc = 4 + MinProc = 2 + MaxProc = 8 + + GoMaxProcEnvName = "GOMAXPROCS" + QloudCPUEnvName = "QLOUD_CPU_GUARANTEE" + InstancectlCPUEnvName = "CPU_GUARANTEE" + DeloyBoxIDName = podagent.EnvBoxIDKey +) + +// Adjust adjust the maximum number of CPUs that can be executing. +// Takes a minimum between n and CPU counts and returns the previous setting +func Adjust(n int) int { + if n < MinProc { + n = MinProc + } + + nCPU := runtime.NumCPU() + if n < nCPU { + return runtime.GOMAXPROCS(n) + } + + return runtime.GOMAXPROCS(nCPU) +} + +// AdjustAuto automatically adjust the maximum number of CPUs that can be executing to safe value +// and returns the previous setting +func AdjustAuto() int { + if val, ok := getEnv(GoMaxProcEnvName); ok { + return applyIntStringLimit(val) + } + + if isCgroupsExists() { + return AdjustCgroup() + } + + if val, ok := getEnv(InstancectlCPUEnvName); ok { + return applyFloatStringLimit(strings.TrimRight(val, "c")) + } + + if val, ok := getEnv(QloudCPUEnvName); ok { + return applyFloatStringLimit(val) + } + + if boxID, ok := os.LookupEnv(DeloyBoxIDName); ok { + return adjustYPBox(boxID) + } + + if yplite.IsAPIAvailable() { + return AdjustYPLite() + } + + return Adjust(SafeProc) +} + +// AdjustQloud automatically adjust the maximum number of CPUs in case of Qloud env +// and returns the previous setting +func AdjustQloud() int { + if val, ok := getEnv(GoMaxProcEnvName); ok { + return applyIntStringLimit(val) + } + + if val, ok := getEnv(QloudCPUEnvName); ok { + return applyFloatStringLimit(val) + } + + return Adjust(MaxProc) +} + +// AdjustYP automatically adjust the maximum number of CPUs in case of YP/Y.Deploy/YP.Hard env +// and returns the previous setting +func AdjustYP() int { + if val, ok := getEnv(GoMaxProcEnvName); ok { + return applyIntStringLimit(val) + } + + if isCgroupsExists() { + return AdjustCgroup() + } + + return adjustYPBox(os.Getenv(DeloyBoxIDName)) +} + +func adjustYPBox(boxID string) int { + resources, err := podagent.NewClient().PodAttributes(context.Background()) + if err != nil { + return Adjust(SafeProc) + } + + var cpuGuarantee float64 + if boxResources, ok := resources.BoxesRequirements[boxID]; ok { + cpuGuarantee = boxResources.CPU.Guarantee / 1000 + } + + if cpuGuarantee <= 0 { + // if we don't have guarantees for current box, let's use pod guarantees + cpuGuarantee = resources.PodRequirements.CPU.Guarantee / 1000 + } + + return applyFloatLimit(cpuGuarantee) +} + +// AdjustYPLite automatically adjust the maximum number of CPUs in case of YP.Lite env +// and returns the previous setting +func AdjustYPLite() int { + if val, ok := getEnv(GoMaxProcEnvName); ok { + return applyIntStringLimit(val) + } + + podAttributes, err := yplite.FetchPodAttributes() + if err != nil { + return Adjust(SafeProc) + } + + return applyFloatLimit(float64(podAttributes.ResourceRequirements.CPU.Guarantee / 1000)) +} + +// AdjustInstancectl automatically adjust the maximum number of CPUs +// and returns the previous setting +// WARNING: supported only instancectl v1.177+ (https://wiki.yandex-team.ru/runtime-cloud/nanny/instancectl-change-log/#1.177) +func AdjustInstancectl() int { + if val, ok := getEnv(GoMaxProcEnvName); ok { + return applyIntStringLimit(val) + } + + if val, ok := getEnv(InstancectlCPUEnvName); ok { + return applyFloatStringLimit(strings.TrimRight(val, "c")) + } + + return Adjust(MaxProc) +} + +// AdjustCgroup automatically adjust the maximum number of CPUs based on the CFS quota +// and returns the previous setting. +func AdjustCgroup() int { + if val, ok := getEnv(GoMaxProcEnvName); ok { + return applyIntStringLimit(val) + } + + quota, err := getCFSQuota() + if err != nil { + return Adjust(SafeProc) + } + + return applyFloatLimit(quota) +} |