Skip to content

Commit bcbf370

Browse files
Merge pull request #603 from WheelyMcBones/pod-va-indexer
feat: using indexes to optimize Pod-VariantAutoscaling mapping
2 parents 02de2a0 + 2a61999 commit bcbf370

File tree

13 files changed

+1009
-257
lines changed

13 files changed

+1009
-257
lines changed

charts/workload-variant-autoscaler/templates/rbac/role.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,14 @@ rules:
6262
- patch
6363
- update
6464
- watch
65+
- apiGroups:
66+
- apps
67+
resources:
68+
- replicasets
69+
verbs:
70+
- get
71+
- list
72+
- watch
6573
- apiGroups:
6674
- llmd.ai
6775
resources:

charts/workload-variant-autoscaler/templates/variantautoscaling.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ spec:
1818
# ScaleTargetRef references the target resource to scale (similar to HPA)
1919
# TODO: Support templating for scaleTargetRef to enable managing groups of deployments
2020
scaleTargetRef:
21+
apiVersion: apps/v1
2122
kind: Deployment
2223
name: {{ .Values.llmd.deploymentName | default (printf "%s-decode" .Values.llmd.modelName) }}
2324
# OpenAI API compatible name of the model

cmd/main.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/llm-d-incubation/workload-variant-autoscaler/internal/datastore"
5151
"github.com/llm-d-incubation/workload-variant-autoscaler/internal/engines/saturation"
5252
"github.com/llm-d-incubation/workload-variant-autoscaler/internal/engines/scalefromzero"
53+
"github.com/llm-d-incubation/workload-variant-autoscaler/internal/indexers"
5354
"github.com/llm-d-incubation/workload-variant-autoscaler/internal/logging"
5455
"github.com/llm-d-incubation/workload-variant-autoscaler/internal/metrics"
5556
"github.com/llm-d-incubation/workload-variant-autoscaler/internal/utils"
@@ -304,6 +305,14 @@ func main() {
304305
os.Exit(1)
305306
}
306307

308+
// Setup custom indexes for lookups on VariantAutoscalings
309+
setupLog.Info("Setting up indexes")
310+
if err := indexers.SetupIndexes(context.Background(), mgr); err != nil {
311+
setupLog.Error(err, "unable to setup indexes")
312+
os.Exit(1)
313+
}
314+
setupLog.Info("Indexes setup completed")
315+
307316
// Initialize metrics
308317
setupLog.Info("Creating metrics emitter instance")
309318
// Force initialization of metrics by creating a metrics emitter

config/rbac/role.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ rules:
5656
- patch
5757
- update
5858
- watch
59+
- apiGroups:
60+
- apps
61+
resources:
62+
- replicasets
63+
verbs:
64+
- get
65+
- list
66+
- watch
5967
- apiGroups:
6068
- llmd.ai
6169
resources:

internal/collector/replica_metrics.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/llm-d-incubation/workload-variant-autoscaler/internal/interfaces"
3636
"github.com/llm-d-incubation/workload-variant-autoscaler/internal/logging"
3737
"github.com/llm-d-incubation/workload-variant-autoscaler/internal/saturation"
38+
"github.com/llm-d-incubation/workload-variant-autoscaler/internal/utils"
3839
)
3940

4041
// ReplicaMetricsCollector collects replica-level metrics for saturation analysis
@@ -65,9 +66,9 @@ func NewReplicaMetricsCollector(metricsSource source.MetricsSource, k8sClient cl
6566
// - ctx: Context for the operation
6667
// - modelID: The model identifier to collect metrics for
6768
// - namespace: The namespace where the model is deployed
68-
// - deployments: Map of deployment name to deployment object
69-
// - variantAutoscalings: Map of deployment name to VA object
70-
// - variantCosts: Map of deployment name to cost value
69+
// - deployments: Map of Deployment namespace/name to Deployment
70+
// - variantAutoscalings: Map of VariantAutoscaling namespace/name to VariantAutoscaling object
71+
// - variantCosts: Map of VariantAutoscaling namespace/name to cost value
7172
//
7273
// Returns:
7374
// - []interfaces.ReplicaMetrics: Per-pod metrics for saturation analysis
@@ -197,30 +198,31 @@ func (c *ReplicaMetricsCollector) CollectReplicaMetrics(
197198
queueLen = 0
198199
}
199200

200-
// Match pod to variant using deployment label selectors
201-
variantName := c.podVAMapper.FindVAForPod(ctx, podName, namespace, deployments, variantAutoscalings)
201+
// Match Pod to VariantAutoscaling using indexed lookup
202+
vaName := c.podVAMapper.FindVAForPod(ctx, podName, namespace, deployments)
202203

203-
if variantName == "" {
204+
if vaName == "" {
204205
logger.Info("Skipping pod that doesn't match any deployment",
205206
"pod", podName,
206207
"deployments", getDeploymentNames(deployments))
207208
continue
208209
}
210+
variantKey := utils.GetNamespacedKey(namespace, vaName)
209211

210212
// Get accelerator name from VariantAutoscaling label
211213
acceleratorName := ""
212-
if va, ok := variantAutoscalings[variantName]; ok && va != nil {
214+
if va, ok := variantAutoscalings[variantKey]; ok && va != nil {
213215
if va.Labels != nil {
214-
if accName, exists := va.Labels["inference.optimization/acceleratorName"]; exists {
216+
if accName, exists := va.Labels[utils.AcceleratorNameLabel]; exists {
215217
acceleratorName = accName
216218
}
217219
}
218220
}
219221

220-
// Look up cost by variant name
222+
// Look up cost by VariantAutoscaling namespace/name
221223
cost := saturation.DefaultVariantCost
222224
if variantCosts != nil {
223-
if c, ok := variantCosts[variantName]; ok {
225+
if c, ok := variantCosts[variantKey]; ok {
224226
cost = c
225227
}
226228
}
@@ -229,7 +231,7 @@ func (c *ReplicaMetricsCollector) CollectReplicaMetrics(
229231
PodName: podName,
230232
ModelID: modelID,
231233
Namespace: namespace,
232-
VariantName: variantName,
234+
VariantName: vaName,
233235
AcceleratorName: acceleratorName,
234236
KvCacheUsage: kvUsage,
235237
QueueLength: queueLen,
@@ -255,8 +257,8 @@ func (c *ReplicaMetricsCollector) CollectReplicaMetrics(
255257
// getDeploymentNames extracts deployment names from the deployments map.
256258
func getDeploymentNames(deployments map[string]*appsv1.Deployment) []string {
257259
names := make([]string, 0, len(deployments))
258-
for name := range deployments {
259-
names = append(names, name)
260+
for _, deploy := range deployments {
261+
names = append(names, deploy.Name)
260262
}
261263
return names
262264
}

internal/collector/source/pod_va_mapper.go

Lines changed: 49 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ import (
99
ctrl "sigs.k8s.io/controller-runtime"
1010
"sigs.k8s.io/controller-runtime/pkg/client"
1111

12-
llmdv1alpha1 "github.com/llm-d-incubation/workload-variant-autoscaler/api/v1alpha1"
12+
"github.com/llm-d-incubation/workload-variant-autoscaler/internal/indexers"
13+
"github.com/llm-d-incubation/workload-variant-autoscaler/internal/logging"
1314
)
1415

1516
// PodVAMapper maps pod names to their corresponding VariantAutoscaling objects.
@@ -24,26 +25,39 @@ func NewPodVAMapper(k8sClient client.Client) *PodVAMapper {
2425
}
2526
}
2627

27-
// FindVAForPod finds the VariantAutoscaling object for a pod by first finding
28-
// its deployment and then finding the VA that targets that deployment.
29-
//
30-
// Returns the VA name if found, empty string otherwise.
28+
// FindVAForPod finds the VariantAutoscaling object for a Pod by:
29+
// 1. finding the Deployment owning the Pod
30+
// 2. finding the VariantAutoscaling that targets that Deployment, using indexed lookups.
31+
// Returns the VariantAutoscaling name if found, empty string otherwise.
3132
func (m *PodVAMapper) FindVAForPod(
3233
ctx context.Context,
3334
podName string,
3435
namespace string,
3536
deployments map[string]*appsv1.Deployment,
36-
variantAutoscalings map[string]*llmdv1alpha1.VariantAutoscaling,
3737
) string {
38+
logger := ctrl.LoggerFrom(ctx)
39+
3840
deploymentName := m.findDeploymentForPod(ctx, podName, namespace, deployments)
3941
if deploymentName == "" {
4042
return ""
4143
}
42-
return m.findVAForDeployment(deploymentName, namespace, variantAutoscalings)
44+
45+
// Use indexed lookup for VariantAutoscaling targeting this Deployment
46+
va, err := indexers.FindVAForDeployment(ctx, m.k8sClient, deploymentName, namespace)
47+
if err != nil {
48+
logger.V(logging.DEBUG).Error(err, "failed to find VariantAutoscaling for deployment", "deployment", deploymentName, "namespace", namespace)
49+
return ""
50+
}
51+
52+
if va == nil {
53+
logger.V(logging.DEBUG).Info("no VariantAutoscaling matched for deployment", "deployment", deploymentName, "namespace", namespace)
54+
return ""
55+
}
56+
57+
return va.Name
4358
}
4459

45-
// findDeploymentForPod finds which deployment owns a pod using Kubernetes API.
46-
// Uses the deployment's label selector to find matching pods.
60+
// findDeploymentForPod finds which Deployment owns a Pod by traversing owner references.
4761
func (m *PodVAMapper) findDeploymentForPod(
4862
ctx context.Context,
4963
podName string,
@@ -52,48 +66,34 @@ func (m *PodVAMapper) findDeploymentForPod(
5266
) string {
5367
logger := ctrl.LoggerFrom(ctx)
5468

55-
// TODO: optimize
56-
for deploymentName, deployment := range deployments {
57-
selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
58-
if err != nil {
59-
logger.Info("Invalid label selector for deployment", "deployment", deploymentName, "error", err)
60-
continue
61-
}
62-
63-
podList := &corev1.PodList{}
64-
listOpts := &client.ListOptions{
65-
Namespace: namespace,
66-
LabelSelector: selector,
67-
}
68-
69-
if err := m.k8sClient.List(ctx, podList, listOpts); err != nil {
70-
logger.Info("Failed to list pods for deployment", "deployment", deploymentName, "error", err)
71-
continue
72-
}
73-
74-
for _, pod := range podList.Items {
75-
if pod.Name == podName {
76-
return deploymentName
77-
}
78-
}
69+
pod := &corev1.Pod{}
70+
if err := m.k8sClient.Get(ctx, client.ObjectKey{Namespace: namespace, Name: podName}, pod); err != nil {
71+
logger.V(logging.DEBUG).Error(err, "failed to get pod", "pod", podName, "namespace", namespace)
72+
return ""
7973
}
80-
return ""
81-
}
8274

83-
// findVAForDeployment finds the VariantAutoscaling object that targets a deployment.
84-
func (m *PodVAMapper) findVAForDeployment(
85-
deploymentName string,
86-
namespace string,
87-
variantAutoscalings map[string]*llmdv1alpha1.VariantAutoscaling,
88-
) string {
89-
for vaName, va := range variantAutoscalings {
90-
if va == nil {
91-
continue
92-
}
93-
if va.Spec.ScaleTargetRef.Name == deploymentName &&
94-
va.Namespace == namespace {
95-
return vaName
96-
}
75+
owner := metav1.GetControllerOf(pod)
76+
if owner == nil || owner.Kind != "ReplicaSet" {
77+
logger.V(logging.DEBUG).Info("Pod has no ReplicaSet owner", "pod", podName, "namespace", namespace)
78+
return ""
79+
}
80+
81+
rs := &appsv1.ReplicaSet{}
82+
if err := m.k8sClient.Get(ctx, client.ObjectKey{Namespace: namespace, Name: owner.Name}, rs); err != nil {
83+
logger.V(logging.DEBUG).Error(err, "failed to get ReplicaSet", "replicaset", owner.Name, "namespace", namespace)
84+
return ""
85+
}
86+
87+
rsOwner := metav1.GetControllerOf(rs)
88+
if rsOwner == nil || rsOwner.Kind != "Deployment" {
89+
logger.V(logging.DEBUG).Info("ReplicaSet has no Deployment owner", "replicaset", owner.Name, "namespace", namespace)
90+
return ""
91+
}
92+
93+
// Verify the Deployment is in our map of tracked Deployments
94+
deploymentKey := namespace + "/" + rsOwner.Name
95+
if deploy, ok := deployments[deploymentKey]; ok && deploy != nil && deploy.Namespace == namespace {
96+
return rsOwner.Name
9797
}
9898
return ""
9999
}

0 commit comments

Comments
 (0)