K8s HPA kube-metric-adapter源码阅读笔记

Posted by 爱折腾的工程师 on Tuesday, July 21, 2020


Kube Metrics Adapter是Kubernetes的通用指标适配器,可以收集和提供用于水平Pod自动缩放的自定义指标和外部指标。


它会发现Horizo​​ntal Pod Autoscaling资源,并开始收集请求的指标并将其存储在内存中。它是使用custom-metrics-apiserver库实现的。



apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
  name: myapp-hpa
    # metric-config.<metricType>.<metricName>.<collectorName>/<configKey>
    metric-config.pods.requests-per-second.json-path/json-key: "$.http_server.rps"
    metric-config.pods.requests-per-second.json-path/path: /metrics
    metric-config.pods.requests-per-second.json-path/port: "9090"
    apiVersion: apps/v1
    kind: Deployment
    name: myapp
  minReplicas: 1
  maxReplicas: 10
  - type: Pods
        name: requests-per-second
        averageValue: 1k
        type: AverageValue

autoscaling/v2beta2 api在k8s 1.12+版本中可用




Prometheus collector

这里只分析Prometheus collector, 它与k8s-prometheus-adapter的区别在前面的文章介绍过了;


Metric Description Type Kind K8s Versions
prometheus-query Generic metric which requires a user defined query. External >=1.12
custom No predefined metrics. Metrics are generated from user defined queries. Object any >=1.12

Object metric已废弃,未来会被淘汰.



func main() {
    defer logs.FlushLogs()
    if len(os.Getenv("GOMAXPROCS")) == 0 {

    cmd := server.NewCommandStartAdapterServer(wait.NeverStop)
    if err := cmd.Execute(); err != nil {
  1. 日志初始化
  2. 设置Runtime Scheduler 中 Processor(简称P)的数量
  3. 初始化cobra Command对象,设置RunE运行函数->RunCustomMetricsAdapterServer
  4. 启动kube-metric-adapter api(custom-metrics-apiserver库实现)


    // 初始化CustomMetricsAdapterServerOptions对象, 涉及安全监听、认证、授权、特性启动
    baseOpts := server.NewCustomMetricsAdapterServerOptions()
    // 默认启动CustomMetricsAPI和ExternalMetricsAPI
    o := AdapterServerOptions{
        CustomMetricsAdapterServerOptions: baseOpts,
        EnableCustomMetricsAPI:            true,
        EnableExternalMetricsAPI:          true,
        MetricsAddress:                    ":7979",
        ZMONTokenName:                     "zmon",
        CredentialsDir:                    "/meta/credentials",


type AdapterServerOptions struct {

    // RemoteKubeConfigFile is the config used to list pods from the master API server
    RemoteKubeConfigFile string
    // EnableCustomMetricsAPI switches on sample apiserver for Custom Metrics API
    EnableCustomMetricsAPI bool
    // EnableExternalMetricsAPI switches on sample apiserver for External Metrics API
    EnableExternalMetricsAPI bool
    // PrometheusServer enables prometheus queries to the specified
    // server
    PrometheusServer string
    // InfluxDBAddress enables Flux queries to the specified InfluxDB instance
    InfluxDBAddress string
    // InfluxDBToken is the token used for querying InfluxDB
    InfluxDBToken string
    // InfluxDBOrg is the organization ID used for querying InfluxDB
    InfluxDBOrg string
    // ZMONKariosDBEndpoint enables ZMON check queries to the specified
    // kariosDB endpoint
    ZMONKariosDBEndpoint string
    // ZMONTokenName is the name of the token used to query ZMON
    ZMONTokenName string
    // Token is an oauth2 token used to authenticate with services like
    // ZMON.
    Token string
    // CredentialsDir is the path to the dir where tokens are stored
    CredentialsDir string
    // SkipperIngressMetrics switches on support for skipper ingress based
    // metric collection.
    SkipperIngressMetrics bool
    // AWSExternalMetrics switches on support for getting external metrics
    // from AWS.
    AWSExternalMetrics bool
    // AWSRegions the AWS regions which are supported for monitoring.
    AWSRegions []string
    // MetricsAddress is the address where to serve prometheus metrics.
    MetricsAddress string
    // SkipperBackendWeightAnnotation is the annotation on the ingress indicating the backend weights
    SkipperBackendWeightAnnotation []string
    // Whether to disregard failing to create collectors for incompatible HPAs - such as when using
    // kube-metrics-adapter beside another Metrics Provider
    DisregardIncompatibleHPAs bool


func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct{}) error {
    // 暴露metrics给prometheus
    go func() {
        http.Handle("/metrics", promhttp.Handler())
        klog.Fatal(http.ListenAndServe(o.MetricsAddress, nil))
    // 构造apiserver的config
    config, err := o.Config()
    if err != nil {
        return err
    // 构造获取clientSet的clientConfig
    var clientConfig *rest.Config
    if len(o.RemoteKubeConfigFile) > 0 {
        // 集群外访问
        loadingRules := &clientcmd.ClientConfigLoadingRules{ExplicitPath: o.RemoteKubeConfigFile}
        loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, &clientcmd.ConfigOverrides{})

        clientConfig, err = loader.ClientConfig()
    } else {
        // 集群内访问
        clientConfig, err = rest.InClusterConfig()
    if err != nil {
        return fmt.Errorf("unable to construct lister client config to initialize provider: %v", err)

    // convert stop channel to a context
    ctx, cancel := context.WithCancel(context.Background())
    go func() {
    // 设置clientSet超时时间
    clientConfig.Timeout = defaultClientGOTimeout

    client, err := kubernetes.NewForConfig(clientConfig)
    if err != nil {
        return fmt.Errorf("failed to initialize new client: %v", err)
    // 获取一个collector工厂对象
    collectorFactory := collector.NewCollectorFactory()

    // 注册prometheus collector
    if o.PrometheusServer != "" {
        promPlugin, err := collector.NewPrometheusCollectorPlugin(client, o.PrometheusServer)
        if err != nil {
            return fmt.Errorf("failed to initialize prometheus collector plugin: %v", err)

        err = collectorFactory.RegisterObjectCollector("", "prometheus", promPlugin)
        if err != nil {
            return fmt.Errorf("failed to register prometheus object collector plugin: %v", err)

        collectorFactory.RegisterExternalCollector([]string{collector.PrometheusMetricName}, promPlugin)
        // 注册Skipper collector
        // skipper collector can only be enabled if prometheus is.
        if o.SkipperIngressMetrics {
            skipperPlugin, err := collector.NewSkipperCollectorPlugin(client, promPlugin, o.SkipperBackendWeightAnnotation)
            if err != nil {
                return fmt.Errorf("failed to initialize skipper collector plugin: %v", err)

            err = collectorFactory.RegisterObjectCollector("Ingress", "", skipperPlugin)
            if err != nil {
                return fmt.Errorf("failed to register skipper collector plugin: %v", err)
    // 注册InfluxDB collector
    if o.InfluxDBAddress != "" {
        influxdbPlugin, err := collector.NewInfluxDBCollectorPlugin(client, o.InfluxDBAddress, o.InfluxDBToken, o.InfluxDBOrg)
        if err != nil {
            return fmt.Errorf("failed to initialize InfluxDB collector plugin: %v", err)
        collectorFactory.RegisterExternalCollector([]string{collector.InfluxDBMetricName}, influxdbPlugin)
    // 注册HTTP collector
    plugin, _ := collector.NewHTTPCollectorPlugin()
    collectorFactory.RegisterExternalCollector([]string{collector.HTTPMetricName}, plugin)
    // 注册pod collector
    // register generic pod collector
    err = collectorFactory.RegisterPodsCollector("", collector.NewPodCollectorPlugin(client))
    if err != nil {
        return fmt.Errorf("failed to register pod collector plugin: %v", err)

    // 注册ZMON collector
    // enable ZMON based metrics
    if o.ZMONKariosDBEndpoint != "" {
        var tokenSource oauth2.TokenSource
        if o.Token != "" {
            tokenSource = oauth2.StaticTokenSource(&oauth2.Token{AccessToken: o.Token})
        } else {
            tokenSource = platformiam.NewTokenSource(o.ZMONTokenName, o.CredentialsDir)

        httpClient := newOauth2HTTPClient(ctx, tokenSource)

        zmonClient := zmon.NewZMONClient(o.ZMONKariosDBEndpoint, httpClient)

        zmonPlugin, err := collector.NewZMONCollectorPlugin(zmonClient)
        if err != nil {
            return fmt.Errorf("failed to initialize ZMON collector plugin: %v", err)

        collectorFactory.RegisterExternalCollector([]string{collector.ZMONCheckMetric}, zmonPlugin)

    awsSessions := make(map[string]*session.Session, len(o.AWSRegions))
    for _, region := range o.AWSRegions {
        awsSessions[region], err = session.NewSession(&aws.Config{Region: aws.String(region)})
        if err != nil {
            return fmt.Errorf("unabled to create aws session for region: %s", region)
    // 注册aws collector
    if o.AWSExternalMetrics {
        collectorFactory.RegisterExternalCollector([]string{collector.AWSSQSQueueLengthMetric}, collector.NewAWSCollectorPlugin(awsSessions))
    // 初始化provider对象, 实现了MetricsProvider接口(github.com/kubernetes-incubator/custom-metrics-apiserver库)
    hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs)
    // 运行HPA资源发现和metric收集
    go hpaProvider.Run(ctx)

    customMetricsProvider := hpaProvider
    externalMetricsProvider := hpaProvider

    // var externalMetricsProvider := nil
    if !o.EnableCustomMetricsAPI {
        customMetricsProvider = nil
    if !o.EnableExternalMetricsAPI {
        externalMetricsProvider = nil

    informer := informers.NewSharedInformerFactory(client, 0)
    // 封装得比较好,k8s聚合API方式启动
    // In this example, the same provider implements both Custom Metrics API and External Metrics API
    server, err := config.Complete(informer).New("kube-metrics-adapter", customMetricsProvider, externalMetricsProvider)
    if err != nil {
        return err
    return server.GenericAPIServer.PrepareRun().Run(ctx.Done())



// collector分三种类型:pods、object、external
type CollectorFactory struct {
    podsPlugins     pluginMap
    // objectPluginMap的数据结构有点特殊,嵌套pluginMap,作用?
    objectPlugins   objectPluginMap
    // externalPlugins就一个map数据结构
    externalPlugins map[string]CollectorPlugin

type objectPluginMap struct {
    Any   pluginMap
    Named map[string]*pluginMap

type pluginMap struct {
    Any   CollectorPlugin
    Named map[string]CollectorPlugin

// 所有Collector插件都要实现CollectorPlugin接口定义的NewCollector方法,其返回的对象也要实现Collector接口定义的GetMetrics和Interval方法
type CollectorPlugin interface {
    NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)

type Collector interface {
    // 获取metrics
    GetMetrics() ([]CollectedMetric, error)
    // 收集间隔
    Interval() time.Duration

type CollectedMetric struct {
    // autoscalingv2 metric源类型
    // 有Object、Pods、Resource、External, 代码里有注释说明
    Type     autoscalingv2.MetricSourceType
    // 自定义metric的值
    Custom   custom_metrics.MetricValue
    // 外部metric的值
    External external_metrics.ExternalMetricValue


func NewCollectorFactory() *CollectorFactory {
    return &CollectorFactory{
        // pluginMap的Named是map数据结构,key是metricCollector的名称,形如prometheus、influxdb、json-path等
        podsPlugins: pluginMap{Named: map[string]CollectorPlugin{}},
        objectPlugins: objectPluginMap{
            Any:   pluginMap{},
            // map数据结构,key是Object的类型kind,形如Ingress等
            Named: map[string]*pluginMap{},
        // externalPlugins也是map数据结构,key是metricName,形如prometheus-query
        externalPlugins: map[string]CollectorPlugin{},

Prometheus Collector插件实现

Prometheus Collector插件的实现过程,其它插件类似的过程

+------------------------------------+            +------------------------------+
|                                    | 初始化对象   |                              |
|     NewPrometheusCollectorPlugin   +------------+ PrometheusCollectorPlugin对象 |
|                                    |            |                              |
+------------------------------------+            +-------------+----------------+
                                                    |                      |
                                                    |   NewCollector方法    |
                                                    |                      |
                                                    |                         |
                                                    |  NewPrometheusCollector |
                                                    |                         |
                                                     |                        |
                                                     | PrometheusCollector对象 |
                                                     |                        |
                                           +----------------+    |    +---------------+
                                           |                |    |    |               |
                                           | GetMetrics方法  +----+----+  Interval方法  |
                                           |                |         |               |
                                           +----------------+         +---------------+


const (
    PrometheusMetricName          = "prometheus-query"
    prometheusQueryNameLabelKey   = "query-name"
    prometheusServerAnnotationKey = "prometheus-server"

// 自定义错误类型
type NoResultError struct {
    query string

func (r NoResultError) Error() string {
    return fmt.Sprintf("query '%s' did not result a valid response", r.query)

// PrometheusCollectorPlugin结构体定义
type PrometheusCollectorPlugin struct {
    // 与prometheus api交互
    promAPI promv1.API
    // 与k8s api交互(client-go库的clientSet对象)
    client  kubernetes.Interface

// 初始化PrometheusCollectorPlugin对象
func NewPrometheusCollectorPlugin(client kubernetes.Interface, prometheusServer string) (*PrometheusCollectorPlugin, error) {
    cfg := api.Config{
        Address:      prometheusServer,
        RoundTripper: http.DefaultTransport,
    // 根据prometheus server地址,获取一个跟prometheus api交互的client对象
    promClient, err := api.NewClient(cfg)
    if err != nil {
        return nil, err

    return &PrometheusCollectorPlugin{
        client:  client,
        promAPI: promv1.NewAPI(promClient),
    }, nil

// CollectorPlugin接口定义的方法
func (p *PrometheusCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
    return NewPrometheusCollector(p.client, p.promAPI, hpa, config, interval)

// 这里autoscaling使用的是v2beta2的API
type PrometheusCollector struct {
    client          kubernetes.Interface
    promAPI         promv1.API
    query           string
    metric          autoscalingv2.MetricIdentifier
    metricType      autoscalingv2.MetricSourceType
    objectReference custom_metrics.ObjectReference
    interval        time.Duration
    perReplica      bool
    hpa             *autoscalingv2.HorizontalPodAutoscaler

// metric-config.<metricType>.<metricName>.<collectorName>/<configKey>
// <configKey> == query-name
// External metric类型:metric-config.external.prometheus-query.prometheus/processed-events-per-second: |
//         scalar(sum(rate(event-service_events_count{application="event-service",processed="true"}[1m])))
// Object metric类型:metric-config.object.processed-events-per-second.prometheus/per-replica: "true"
func NewPrometheusCollector(client kubernetes.Interface, promAPI promv1.API, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PrometheusCollector, error) {
    c := &PrometheusCollector{
        client:     client,
        promAPI:    promAPI,
        interval:   interval,
        hpa:        hpa,
        metric:     config.Metric,
        metricType: config.Type,

    switch config.Type {
    // Object metric类型
    case autoscalingv2.ObjectMetricSourceType:
        c.objectReference = config.ObjectReference
        c.perReplica = config.PerReplica

        if v, ok := config.Config["query"]; ok {
            // TODO: validate query
            c.query = v
        } else {
            return nil, fmt.Errorf("no prometheus query defined")
    // External metric类型
    case autoscalingv2.ExternalMetricSourceType:
        // 必须设置Selector匹配labels
        if config.Metric.Selector == nil {
            return nil, fmt.Errorf("selector for prometheus query is not specified")
        // 获取metric-config.<metricType>.<metricName>.<collectorName>/<configKey>格式中的configKey
        queryName, ok := config.Config[prometheusQueryNameLabelKey]
        if !ok {
            return nil, fmt.Errorf("query name not specified on metric")
        // 根据configKey获取对应的值,也就是prom sql语句,形如scalar(sum(rate(event-service_events_count{application="event-service",processed="true"}[1m])))
        if v, ok := config.Config[queryName]; ok {
            // TODO: validate query
            c.query = v
        } else {
            return nil, fmt.Errorf("no prometheus query defined for metric")
        // 可以为每个HPA对象设置独立的prometheus server url, 会覆盖全局配置的prometheus server
        // Use custom Prometheus URL if defined in HPA annotation.
        if promServer, ok := config.Config[prometheusServerAnnotationKey]; ok {
            cfg := api.Config{
                Address:      promServer,
                RoundTripper: http.DefaultTransport,

            promClient, err := api.NewClient(cfg)
            if err != nil {
                return nil, err
            c.promAPI = promv1.NewAPI(promClient)

    return c, nil

// Collector接口定义的方法
func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
    // 调用prometheus api查询
    // TODO: use real context
    value, _, err := c.promAPI.Query(context.Background(), c.query, time.Now().UTC())
    if err != nil {
        return nil, err

    var sampleValue model.SampleValue
    // 查询返回的数据类型有:matrix、vector、scalar、string,对应的格式见这里:https://zhuanlan.zhihu.com/p/121104877
    switch value.Type() {
    // 根据不同的响应类型进行数据处理
    case model.ValVector:
        samples := value.(model.Vector)
        if len(samples) == 0 {
            return nil, &NoResultError{query: c.query}

        sampleValue = samples[0].Value
    case model.ValScalar:
        scalar := value.(*model.Scalar)
        sampleValue = scalar.Value
    // 判断NaN非数,一般用于表示无效的除法操作结果0/0或Sqrt(-1)
    if math.IsNaN(float64(sampleValue)) {
        return nil, &NoResultError{query: c.query}

    if c.perReplica {
        // get current replicas for the targeted scale object. This is used to
        // calculate an average metric instead of total.
        // targetAverageValue will be available in Kubernetes v1.12
        // https://github.com/kubernetes/kubernetes/pull/64097
        // 获取当前Deployment/StatefulSet的副本数
        replicas, err := targetRefReplicas(c.client, c.hpa)
        if err != nil {
            return nil, err
        // 除以副本数,算平均值
        sampleValue = model.SampleValue(float64(sampleValue) / float64(replicas))

    var metricValue CollectedMetric
    // metric类型判断处理
    switch c.metricType {
    case autoscalingv2.ObjectMetricSourceType:
        metricValue = CollectedMetric{
            Type: c.metricType,
            Custom: custom_metrics.MetricValue{
                DescribedObject: c.objectReference,
                Metric:          custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.metric.Selector},
                Timestamp:       metav1.Time{Time: time.Now().UTC()},
                Value:           *resource.NewMilliQuantity(int64(sampleValue*1000), resource.DecimalSI),
    case autoscalingv2.ExternalMetricSourceType:
        metricValue = CollectedMetric{
            Type: c.metricType,
            External: external_metrics.ExternalMetricValue{
                MetricName:   c.metric.Name,
                MetricLabels: c.metric.Selector.MatchLabels,
                Timestamp:    metav1.Time{Time: time.Now().UTC()},
                Value:        *resource.NewMilliQuantity(int64(sampleValue*1000), resource.DecimalSI),
    // 只有一个metric,返回竟然是一个slice?
    return []CollectedMetric{metricValue}, nil

// Collector接口定义的方法
func (c *PrometheusCollector) Interval() time.Duration {
    return c.interval


// 写入CollectorFactory的objectPlugins.Any.Named map
err = collectorFactory.RegisterObjectCollector("", "prometheus", promPlugin)
        if err != nil {
            return fmt.Errorf("failed to register prometheus object collector plugin: %v", err)
// 写入externalPlugins map
collectorFactory.RegisterExternalCollector([]string{collector.PrometheusMetricName}, promPlugin)

HPA Provider

成为一个HPA Provider,就要实现MetricsProvider的接口


// CustomMetricInfo describes a metric for a particular
// fully-qualified group resource.
type CustomMetricInfo struct {
    GroupResource schema.GroupResource
    Namespaced    bool
    Metric        string

// ExternalMetricInfo describes a metric.
type ExternalMetricInfo struct {
    Metric string

// CustomMetricsProvider is a source of custom metrics
// which is able to supply a list of available metrics,
// as well as metric values themselves on demand.
// Note that group-resources are provided  as GroupResources,
// not GroupKinds.  This is to allow flexibility on the part
// of the implementor: implementors do not necessarily need
// to be aware of all existing kinds and their corresponding
// REST mappings in order to perform queries.
// For queries that use label selectors, it is up to the
// implementor to decide how to make use of the label selector --
// they may wish to query the main Kubernetes API server, or may
// wish to simply make use of stored information in their TSDB.
// CustomMetricsProvider接口
type CustomMetricsProvider interface {
    // GetMetricByName fetches a particular metric for a particular object.
    // The namespace will be empty if the metric is root-scoped.
    GetMetricByName(name types.NamespacedName, info CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValue, error)

    // GetMetricBySelector fetches a particular metric for a set of objects matching
    // the given label selector.  The namespace will be empty if the metric is root-scoped.
    GetMetricBySelector(namespace string, selector labels.Selector, info CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error)

    // ListAllMetrics provides a list of all available metrics at
    // the current time.  Note that this is not allowed to return
    // an error, so it is reccomended that implementors cache and
    // periodically update this list, instead of querying every time.
    ListAllMetrics() []CustomMetricInfo

// ExternalMetricsProvider is a source of external metrics.
// Metric is normally identified by a name and a set of labels/tags. It is up to a specific
// implementation how to translate metricSelector to a filter for metric values.
// Namespace can be used by the implemetation for metric identification, access control or ignored.
// ExternalMetricsProvider接口
type ExternalMetricsProvider interface {
    GetExternalMetric(namespace string, metricSelector labels.Selector, info ExternalMetricInfo) (*external_metrics.ExternalMetricValueList, error)

    ListAllExternalMetrics() []ExternalMetricInfo

// MetricsProvider接口包含CustomMetricsProvider和ExternalMetricsProvider两个接口
type MetricsProvider interface {


// metricCollection is a container for sending collected metrics across a
// channel.
type metricCollection struct {
    Values []collector.CollectedMetric
    Error  error

// NewHPAProvider initializes a new HPAProvider.
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory, disregardIncompatibleHPAs bool) *HPAProvider {
    // channel中发送[]collector.CollectedMetric
    metricsc := make(chan metricCollection)
    // HPAProvider对象
    return &HPAProvider{
        client:            client,
        interval:          interval,
        collectorInterval: collectorInterval,
        metricSink:        metricsc,
        metricStore: NewMetricStore(func() time.Time {
            // 匿名函数,当前时间+15分钟的Time对象
            return time.Now().UTC().Add(15 * time.Minute)
        collectorFactory:          collectorFactory,
        recorder:                  recorder.CreateEventRecorder(client),
        logger:                    log.WithFields(log.Fields{"provider": "hpa"}),
        disregardIncompatibleHPAs: disregardIncompatibleHPAs,


// GetMetricByName gets a single metric by name.
func (p *HPAProvider) GetMetricByName(name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValue, error) {
    metric := p.metricStore.GetMetricsByName(name, info)
    if metric == nil {
        return nil, provider.NewMetricNotFoundForError(info.GroupResource, info.Metric, name.Name)
    return metric, nil

// GetMetricBySelector returns metrics for namespaced resources by
// label selector.
func (p *HPAProvider) GetMetricBySelector(namespace string, selector labels.Selector, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) {
    return p.metricStore.GetMetricsBySelector(namespace, selector, info), nil

// ListAllMetrics list all available metrics from the provicer.
func (p *HPAProvider) ListAllMetrics() []provider.CustomMetricInfo {
    return p.metricStore.ListAllMetrics()

func (p *HPAProvider) GetExternalMetric(namespace string, metricSelector labels.Selector, info provider.ExternalMetricInfo) (*external_metrics.ExternalMetricValueList, error) {
    return p.metricStore.GetExternalMetric(namespace, metricSelector, info)

func (p *HPAProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo {
    return p.metricStore.ListAllExternalMetrics()


// customMetricsStoredMetric is a wrapper around custom_metrics.MetricValue with a metricsTTL used
// to clean up stale metrics from the customMetricsStore.
type customMetricsStoredMetric struct {
    Value custom_metrics.MetricValue
    TTL   time.Time

type externalMetricsStoredMetric struct {
    Value external_metrics.ExternalMetricValue
    TTL   time.Time

// MetricStore is a simple in-memory Metrics Store for HPA metrics.
type MetricStore struct {
    customMetricsStore   map[string]map[schema.GroupResource]map[string]map[string]customMetricsStoredMetric
    externalMetricsStore map[string]map[string]externalMetricsStoredMetric
    metricsTTLCalculator func() time.Time

// NewMetricStore initializes an empty Metrics Store.
func NewMetricStore(ttlCalculator func() time.Time) *MetricStore {
    return &MetricStore{
        // 4层map嵌套数据结构?
        customMetricsStore:   make(map[string]map[schema.GroupResource]map[string]map[string]customMetricsStoredMetric, 0),
        // 2层map嵌套数据结构
        externalMetricsStore: make(map[string]map[string]externalMetricsStoredMetric, 0),
        metricsTTLCalculator: ttlCalculator,


// Run runs the HPA resource discovery and metric collection.
func (p *HPAProvider) Run(ctx context.Context) {
    // initialize collector table
    // 初始化CollectorScheduler对象
    p.collectorScheduler = NewCollectorScheduler(ctx, p.metricSink)
    // 从collector收集所有的metric, 写入metricStore
    go p.collectMetrics(ctx)

    for {
        // 解析集群中所有的HPA对象,根据HPA对象中的配置构建Collector
        err := p.updateHPAs()
        if err != nil {
        } else {

        select {
        // 间隔30s
        case <-time.After(p.interval):
        case <-ctx.Done():
            p.logger.Info("Stopped HPA provider.")

// NewCollectorScheudler initializes a new CollectorScheduler.
func NewCollectorScheduler(ctx context.Context, metricsc chan<- metricCollection) *CollectorScheduler {
    return &CollectorScheduler{
        ctx:        ctx,
        table:      map[resourceReference]map[collector.MetricTypeName]context.CancelFunc{},
        metricSink: metricsc,

// CollectorScheduler is a scheduler for running metric collection jobs.
// It keeps track of all running collectors and stops them if they are to be
// removed.
type CollectorScheduler struct {
    ctx        context.Context
    // hap资源与metric指标的映射关系;第一层map数据结构,key是hap资源,value是第二层map,存放metric与ctx的自定义CancelFunc
    table      map[resourceReference]map[collector.MetricTypeName]context.CancelFunc
    metricSink chan<- metricCollection


// collectMetrics collects all metrics from collectors and manages a central
// metric store.
func (p *HPAProvider) collectMetrics(ctx context.Context) {
    // run garbage collection every 10 minutes
    go func(ctx context.Context) {
        for {
            select {
            case <-time.After(10 * time.Minute):
                // 间隔10分钟,移除MetricStore中customMetricsStore/externalMetricsStore中ttl过期的metric(ttl时间15分钟)
            case <-ctx.Done():
                p.logger.Info("Stopped metrics store garbage collection.")

    for {
        select {
        // 从metricSink channel中取值
        case collection := <-p.metricSink:
            if collection.Error != nil {
                p.logger.Errorf("Failed to collect metrics: %v", collection.Error)
            } else {

            p.logger.Infof("Collected %d new metric(s)", len(collection.Values))
            // 根据CollectedMetric的类型来处理
            for _, value := range collection.Values {
                switch value.Type {
                case autoscalingv2.ObjectMetricSourceType, autoscalingv2.PodsMetricSourceType:
                    p.logger.Infof("Collected new custom metric '%s' (%s) for %s %s/%s",
                case autoscalingv2.ExternalMetricSourceType:
                    p.logger.Infof("Collected new external metric '%s' (%s) [%s]",
                // metric是Object、Pods类型,写入customMetricsStore
                // metric是External类型,写入externalMetricsStore
        case <-ctx.Done():
            p.logger.Info("Stopped metrics collection.")

// 解析HPA对象,构建Collector

// updateHPAs discovers all HPA resources and sets up metric collectors for new
// HPAs.
func (p *HPAProvider) updateHPAs() error {
    p.logger.Info("Looking for HPAs")
    // 获取集群所有的HPA对象(v2beta2 api)
    hpas, err := p.client.AutoscalingV2beta2().HorizontalPodAutoscalers(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
    if err != nil {
        return err
    // HPA map缓存
    newHPACache := make(map[resourceReference]autoscalingv2.HorizontalPodAutoscaler, len(hpas.Items))

    newHPAs := 0

    for _, hpa := range hpas.Items {
        resourceRef := resourceReference{
            Name:      hpa.Name,
            Namespace: hpa.Namespace,
        // 先从缓存里获取
        cachedHPA, ok := p.hpaCache[resourceRef]
        hpaUpdated := !equalHPA(cachedHPA, hpa)
        if !ok || hpaUpdated {
            // if the hpa has changed then remove the previous
            // scheduled collector.
            if hpaUpdated {
                p.logger.Infof("Removing previously scheduled metrics collector: %s", resourceRef)
            // 解析HPA对象里的以metric-config为前缀的annotation,返回[]*MetricConfig;可以设置两个特殊的annotation
            // metric-config.*.*.*/per-replica,会自动把metric值除以副本数
            // metric-config.*.*.*/interval,可设置局部的收集间隔,覆盖全局的metric收集间隔collectorInterval
            metricConfigs, err := collector.ParseHPAMetrics(&hpa)
            if err != nil {
                p.logger.Errorf("Failed to parse HPA metrics: %v", err)

            cache := true
            for _, config := range metricConfigs {
                interval := config.Interval
                if interval == 0 {
                    interval = p.collectorInterval
                // 从collector工厂里,根据HPA里面定义的metricType调用对应的collector插件返回Collector对象
                // 这里以prometheus collector插件为例
                c, err := p.collectorFactory.NewCollector(&hpa, config, interval)
                if err != nil {

                    // Only log when it's not a PluginNotFoundError AND flag disregardIncompatibleHPAs is true
                    if !(errors.Is(err, &collector.PluginNotFoundError{}) && p.disregardIncompatibleHPAs) {
                        p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)

                    cache = false

                p.logger.Infof("Adding new metrics collector: %T", c)
                // 添加collector到CollectorScheduler的table,重复添加的话就先停止旧的collector
                // 每隔上面collector间隔时间就调用collector插件实现的GetMetrics()获取指标值写入metricCollection channel                
                p.collectorScheduler.Add(resourceRef, config.MetricTypeName, c)

            // if we get an error setting up the collectors for the
            // HPA, don't cache it, but try again later.
            if !cache {
        // 写入新的HPA Cache中
        newHPACache[resourceRef] = hpa

    for ref := range p.hpaCache {
        if _, ok := newHPACache[ref]; ok {

        p.logger.Infof("Removing previously scheduled metrics collector: %s", ref)

    p.logger.Infof("Found %d new/updated HPA(s)", newHPAs)
    // 更新缓存
    p.hpaCache = newHPACache
    return nil

一个HPA Provider的运行过程分析就是这样了

HPA Controller生成custom/external.metrics.k8s.io

HPA对象的指标转化为custom/external.metrics.k8s.io API可获取的指标;这部分工作是由kube-controller-manager的HPA controller实现的

HPA Controller -> Metrics Aggregator -> Kube-metric-adapter -> Prometheus





