2020-06-27 17:26:46 +09:00
package controllers
import (
"context"
2020-07-03 09:05:46 +09:00
"errors"
2020-06-27 17:26:46 +09:00
"fmt"
2020-12-14 21:38:01 -08:00
"math"
2020-12-12 15:48:19 -08:00
"strconv"
2020-06-27 17:26:46 +09:00
"strings"
2021-02-07 17:37:27 +09:00
"time"
2020-10-07 17:00:44 -07:00
"github.com/summerwind/actions-runner-controller/api/v1alpha1"
2020-12-12 15:48:19 -08:00
"sigs.k8s.io/controller-runtime/pkg/client"
)
const (
defaultScaleUpThreshold = 0.8
defaultScaleDownThreshold = 0.3
defaultScaleUpFactor = 1.3
defaultScaleDownFactor = 0.7
2020-06-27 17:26:46 +09:00
)
2021-02-07 17:37:27 +09:00
func getValueAvailableAt ( now time . Time , from , to * time . Time , reservedValue int ) * int {
if to != nil && now . After ( * to ) {
return nil
}
if from != nil && now . Before ( * from ) {
return nil
}
return & reservedValue
}
func ( r * HorizontalRunnerAutoscalerReconciler ) getDesiredReplicasFromCache ( hra v1alpha1 . HorizontalRunnerAutoscaler ) * int {
var entry * v1alpha1 . CacheEntry
for i := range hra . Status . CacheEntries {
ent := hra . Status . CacheEntries [ i ]
if ent . Key != v1alpha1 . CacheEntryKeyDesiredReplicas {
continue
}
if ! time . Now ( ) . Before ( ent . ExpirationTime . Time ) {
continue
}
entry = & ent
break
}
if entry != nil {
v := getValueAvailableAt ( time . Now ( ) , nil , & entry . ExpirationTime . Time , entry . Value )
if v != nil {
return v
}
}
return nil
}
2020-07-19 18:42:06 +09:00
func ( r * HorizontalRunnerAutoscalerReconciler ) determineDesiredReplicas ( rd v1alpha1 . RunnerDeployment , hra v1alpha1 . HorizontalRunnerAutoscaler ) ( * int , error ) {
if hra . Spec . MinReplicas == nil {
return nil , fmt . Errorf ( "horizontalrunnerautoscaler %s/%s is missing minReplicas" , hra . Namespace , hra . Name )
} else if hra . Spec . MaxReplicas == nil {
return nil , fmt . Errorf ( "horizontalrunnerautoscaler %s/%s is missing maxReplicas" , hra . Namespace , hra . Name )
2020-06-27 17:26:46 +09:00
}
2020-12-12 15:48:19 -08:00
metrics := hra . Spec . Metrics
if len ( metrics ) == 0 || metrics [ 0 ] . Type == v1alpha1 . AutoscalingMetricTypeTotalNumberOfQueuedAndInProgressWorkflowRuns {
return r . calculateReplicasByQueuedAndInProgressWorkflowRuns ( rd , hra )
} else if metrics [ 0 ] . Type == v1alpha1 . AutoscalingMetricTypePercentageRunnersBusy {
return r . calculateReplicasByPercentageRunnersBusy ( rd , hra )
} else {
return nil , fmt . Errorf ( "validting autoscaling metrics: unsupported metric type %q" , metrics [ 0 ] . Type )
}
}
func ( r * HorizontalRunnerAutoscalerReconciler ) calculateReplicasByQueuedAndInProgressWorkflowRuns ( rd v1alpha1 . RunnerDeployment , hra v1alpha1 . HorizontalRunnerAutoscaler ) ( * int , error ) {
2020-06-27 17:26:46 +09:00
2020-12-12 15:48:19 -08:00
var repos [ ] [ ] string
metrics := hra . Spec . Metrics
2020-06-27 17:26:46 +09:00
repoID := rd . Spec . Template . Spec . Repository
if repoID == "" {
2020-07-03 09:05:46 +09:00
orgName := rd . Spec . Template . Spec . Organization
if orgName == "" {
return nil , fmt . Errorf ( "asserting runner deployment spec to detect bug: spec.template.organization should not be empty on this code path" )
}
2020-12-12 15:48:19 -08:00
if len ( metrics [ 0 ] . RepositoryNames ) == 0 {
2020-07-03 09:05:46 +09:00
return nil , errors . New ( "validating autoscaling metrics: spec.autoscaling.metrics[].repositoryNames is required and must have one more more entries for organizational runner deployment" )
}
for _ , repoName := range metrics [ 0 ] . RepositoryNames {
repos = append ( repos , [ ] string { orgName , repoName } )
}
} else {
repo := strings . Split ( repoID , "/" )
repos = append ( repos , repo )
2020-06-27 17:26:46 +09:00
}
var total , inProgress , queued , completed , unknown int
2020-10-07 17:00:44 -07:00
type callback func ( )
listWorkflowJobs := func ( user string , repoName string , runID int64 , fallback_cb callback ) {
if runID == 0 {
fallback_cb ( )
return
}
jobs , _ , err := r . GitHubClient . Actions . ListWorkflowJobs ( context . TODO ( ) , user , repoName , runID , nil )
if err != nil {
r . Log . Error ( err , "Error listing workflow jobs" )
fallback_cb ( )
} else if len ( jobs . Jobs ) == 0 {
fallback_cb ( )
} else {
for _ , job := range jobs . Jobs {
switch job . GetStatus ( ) {
case "completed" :
// We add a case for `completed` so it is not counted in `unknown`.
// And we do not increment the counter for completed because
// that counter only refers to workflows. The reason for
// this is because we do not get a list of jobs for
// completed workflows in order to keep the number of API
// calls to a minimum.
case "in_progress" :
inProgress ++
case "queued" :
queued ++
default :
unknown ++
}
}
}
}
2020-06-27 17:26:46 +09:00
2020-07-03 09:05:46 +09:00
for _ , repo := range repos {
user , repoName := repo [ 0 ] , repo [ 1 ]
2021-02-09 10:13:53 +09:00
workflowRuns , err := r . GitHubClient . ListRepositoryWorkflowRuns ( context . TODO ( ) , user , repoName )
2020-07-03 09:05:46 +09:00
if err != nil {
return nil , err
}
2021-02-09 10:13:53 +09:00
for _ , run := range workflowRuns {
2020-07-03 09:05:46 +09:00
total ++
// In May 2020, there are only 3 statuses.
// Follow the below links for more details:
// - https://developer.github.com/v3/actions/workflow-runs/#list-repository-workflow-runs
// - https://developer.github.com/v3/checks/runs/#create-a-check-run
2020-10-07 17:00:44 -07:00
switch run . GetStatus ( ) {
2020-07-03 09:05:46 +09:00
case "completed" :
completed ++
case "in_progress" :
2020-10-07 17:00:44 -07:00
listWorkflowJobs ( user , repoName , run . GetID ( ) , func ( ) { inProgress ++ } )
2020-07-03 09:05:46 +09:00
case "queued" :
2020-10-07 17:00:44 -07:00
listWorkflowJobs ( user , repoName , run . GetID ( ) , func ( ) { queued ++ } )
2020-07-03 09:05:46 +09:00
default :
unknown ++
}
2020-06-27 17:26:46 +09:00
}
}
2020-07-19 18:42:06 +09:00
minReplicas := * hra . Spec . MinReplicas
maxReplicas := * hra . Spec . MaxReplicas
2020-06-27 17:26:46 +09:00
necessaryReplicas := queued + inProgress
var desiredReplicas int
if necessaryReplicas < minReplicas {
desiredReplicas = minReplicas
} else if necessaryReplicas > maxReplicas {
desiredReplicas = maxReplicas
} else {
desiredReplicas = necessaryReplicas
}
rd . Status . Replicas = & desiredReplicas
2020-07-03 09:05:46 +09:00
replicas := desiredReplicas
2020-06-27 17:26:46 +09:00
r . Log . V ( 1 ) . Info (
"Calculated desired replicas" ,
"computed_replicas_desired" , desiredReplicas ,
"spec_replicas_min" , minReplicas ,
"spec_replicas_max" , maxReplicas ,
"workflow_runs_completed" , completed ,
"workflow_runs_in_progress" , inProgress ,
"workflow_runs_queued" , queued ,
"workflow_runs_unknown" , unknown ,
2021-02-16 12:44:51 +09:00
"namespace" , hra . Namespace ,
"runner_deployment" , rd . Name ,
"horizontal_runner_autoscaler" , hra . Name ,
2020-06-27 17:26:46 +09:00
)
return & replicas , nil
}
2020-12-12 15:48:19 -08:00
func ( r * HorizontalRunnerAutoscalerReconciler ) calculateReplicasByPercentageRunnersBusy ( rd v1alpha1 . RunnerDeployment , hra v1alpha1 . HorizontalRunnerAutoscaler ) ( * int , error ) {
ctx := context . Background ( )
minReplicas := * hra . Spec . MinReplicas
maxReplicas := * hra . Spec . MaxReplicas
metrics := hra . Spec . Metrics [ 0 ]
scaleUpThreshold := defaultScaleUpThreshold
scaleDownThreshold := defaultScaleDownThreshold
scaleUpFactor := defaultScaleUpFactor
scaleDownFactor := defaultScaleDownFactor
if metrics . ScaleUpThreshold != "" {
sut , err := strconv . ParseFloat ( metrics . ScaleUpThreshold , 64 )
if err != nil {
return nil , errors . New ( "validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpThreshold cannot be parsed into a float64" )
}
scaleUpThreshold = sut
}
if metrics . ScaleDownThreshold != "" {
sdt , err := strconv . ParseFloat ( metrics . ScaleDownThreshold , 64 )
if err != nil {
return nil , errors . New ( "validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownThreshold cannot be parsed into a float64" )
}
scaleDownThreshold = sdt
}
2021-02-16 17:16:26 +09:00
scaleUpAdjustment := metrics . ScaleUpAdjustment
if scaleUpAdjustment != 0 {
if metrics . ScaleUpAdjustment < 0 {
return nil , errors . New ( "validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpAdjustment cannot be lower than 0" )
}
if metrics . ScaleUpFactor != "" {
return nil , errors . New ( "validating autoscaling metrics: spec.autoscaling.metrics[]: scaleUpAdjustment and scaleUpFactor cannot be specified together" )
}
} else if metrics . ScaleUpFactor != "" {
2020-12-12 15:48:19 -08:00
suf , err := strconv . ParseFloat ( metrics . ScaleUpFactor , 64 )
if err != nil {
return nil , errors . New ( "validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpFactor cannot be parsed into a float64" )
}
scaleUpFactor = suf
}
2021-02-16 17:16:26 +09:00
scaleDownAdjustment := metrics . ScaleDownAdjustment
if scaleDownAdjustment != 0 {
if metrics . ScaleDownAdjustment < 0 {
return nil , errors . New ( "validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownAdjustment cannot be lower than 0" )
}
if metrics . ScaleDownFactor != "" {
return nil , errors . New ( "validating autoscaling metrics: spec.autoscaling.metrics[]: scaleDownAdjustment and scaleDownFactor cannot be specified together" )
}
} else if metrics . ScaleDownFactor != "" {
2020-12-12 15:48:19 -08:00
sdf , err := strconv . ParseFloat ( metrics . ScaleDownFactor , 64 )
if err != nil {
return nil , errors . New ( "validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownFactor cannot be parsed into a float64" )
}
scaleDownFactor = sdf
}
// return the list of runners in namespace. Horizontal Runner Autoscaler should only be responsible for scaling resources in its own ns.
var runnerList v1alpha1 . RunnerList
if err := r . List ( ctx , & runnerList , client . InNamespace ( rd . Namespace ) ) ; err != nil {
return nil , err
}
runnerMap := make ( map [ string ] struct { } )
for _ , items := range runnerList . Items {
runnerMap [ items . Name ] = struct { } { }
}
2021-02-16 12:44:51 +09:00
var (
enterprise = rd . Spec . Template . Spec . Enterprise
organization = rd . Spec . Template . Spec . Organization
repository = rd . Spec . Template . Spec . Repository
)
2020-12-12 15:48:19 -08:00
// ListRunners will return all runners managed by GitHub - not restricted to ns
2021-02-16 12:44:51 +09:00
runners , err := r . GitHubClient . ListRunners (
ctx ,
enterprise ,
organization ,
repository )
2020-12-12 15:48:19 -08:00
if err != nil {
return nil , err
}
numRunners := len ( runnerList . Items )
numRunnersBusy := 0
for _ , runner := range runners {
if _ , ok := runnerMap [ * runner . Name ] ; ok && runner . GetBusy ( ) {
numRunnersBusy ++
}
}
var desiredReplicas int
fractionBusy := float64 ( numRunnersBusy ) / float64 ( numRunners )
if fractionBusy >= scaleUpThreshold {
2021-02-16 17:16:26 +09:00
if scaleUpAdjustment > 0 {
desiredReplicas = numRunners + scaleUpAdjustment
} else {
desiredReplicas = int ( math . Ceil ( float64 ( numRunners ) * scaleUpFactor ) )
}
2020-12-12 15:48:19 -08:00
} else if fractionBusy < scaleDownThreshold {
2021-02-16 17:16:26 +09:00
if scaleDownAdjustment > 0 {
desiredReplicas = numRunners - scaleDownAdjustment
} else {
desiredReplicas = int ( float64 ( numRunners ) * scaleDownFactor )
}
2020-12-12 15:48:19 -08:00
} else {
desiredReplicas = * rd . Spec . Replicas
}
2021-01-24 10:58:35 +09:00
2021-01-22 02:11:21 +01:00
if desiredReplicas < minReplicas {
desiredReplicas = minReplicas
} else if desiredReplicas > maxReplicas {
desiredReplicas = maxReplicas
}
2020-12-12 15:48:19 -08:00
r . Log . V ( 1 ) . Info (
"Calculated desired replicas" ,
"computed_replicas_desired" , desiredReplicas ,
"spec_replicas_min" , minReplicas ,
"spec_replicas_max" , maxReplicas ,
"current_replicas" , rd . Spec . Replicas ,
"num_runners" , numRunners ,
"num_runners_busy" , numRunnersBusy ,
2021-02-16 12:44:51 +09:00
"namespace" , hra . Namespace ,
"runner_deployment" , rd . Name ,
"horizontal_runner_autoscaler" , hra . Name ,
"enterprise" , enterprise ,
"organization" , organization ,
"repository" , repository ,
2020-12-12 15:48:19 -08:00
)
rd . Status . Replicas = & desiredReplicas
replicas := desiredReplicas
return & replicas , nil
}