Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Following the tradition of Unix/Linux `top` tools, `ktop` is a tool that display

## Installation

### kubectl plugin (recommended)
### kubectl plugin

```bash
kubectl krew install ktop
Expand Down
11 changes: 6 additions & 5 deletions cmd/ktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,19 @@ func NewKtopCmd() *cobra.Command {
}

// tryPrometheus attempts to create, start, and verify a prometheus metrics source.
// It performs a test scrape to verify connectivity before returning.
// It performs a connectivity test FIRST before starting the expensive collection.
func tryPrometheus(ctx context.Context, restConfig *rest.Config, cfg *promMetrics.PromConfig) (*promMetrics.PromMetricsSource, error) {
source, err := promMetrics.NewPromMetricsSource(restConfig, cfg)
if err != nil {
return nil, err
}
if err := source.Start(ctx); err != nil {
source.Stop()
// TEST FIRST - quick connectivity check before starting expensive collection
// This prevents hanging on Start() if the cluster is unreachable
if err := source.TestConnection(ctx); err != nil {
return nil, err
}
// Verify connectivity with a test scrape
if err := source.TestConnection(ctx); err != nil {
// THEN start collection (now non-blocking)
if err := source.Start(ctx); err != nil {
source.Stop()
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions docs/overrides/home.html
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
}

.tx-hero__logo {
height: 6rem;
margin-bottom: 1rem;
width: 200px;
height: auto;
margin-bottom: 0.5rem;
}

.tx-hero__content {
Expand Down
21 changes: 10 additions & 11 deletions prom/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,11 @@ func (cc *CollectorController) Stop() error {
// TestScrape performs a quick test to verify connectivity to prometheus endpoints.
// It makes a direct API call to test RBAC permissions for nodes/proxy.
// Returns nil if the metrics endpoints are accessible.
// This can be called before Start() to verify connectivity before starting collection.
func (cc *CollectorController) TestScrape(ctx context.Context) error {
cc.mutex.RLock()
if !cc.running {
cc.mutex.RUnlock()
return fmt.Errorf("controller is not running")
}
kubeConfig := cc.kubeConfig
cc.mutex.RUnlock()

// Create a quick client to test connectivity
clientset, err := kubernetes.NewForConfig(kubeConfig)
// Note: kubeConfig is set at construction time, no lock needed
clientset, err := kubernetes.NewForConfig(cc.kubeConfig)
if err != nil {
return fmt.Errorf("creating test client: %w", err)
}
Expand Down Expand Up @@ -128,8 +122,13 @@ func (cc *CollectorController) runCollector(ctx context.Context) {
return
}

// Run immediate first collection (don't wait for ticker)
cc.collectFromAllComponents(ctx)
// Run first collection NON-BLOCKING with timeout
// This prevents startup hangs - UI will show loading state while metrics populate
go func() {
firstCtx, cancel := context.WithTimeout(ctx, cc.config.Timeout)
defer cancel()
cc.collectFromAllComponents(firstCtx)
}()

ticker := time.NewTicker(cc.config.Interval)
defer ticker.Stop()
Expand Down
52 changes: 37 additions & 15 deletions prom/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,41 +97,58 @@ func (ks *KubernetesScraper) ScrapeComponent(ctx context.Context, component Comp
return ks.scrapeTarget(ctx, target)
}

// scrapeAllTargets scrapes all targets and merges results into a single ScrapedMetrics
// scrapeAllTargets scrapes all targets IN PARALLEL and merges results into a single ScrapedMetrics
// This is used for node-based components where we need metrics from all nodes
func (ks *KubernetesScraper) scrapeAllTargets(ctx context.Context, targets []*ScrapeTarget) (*ScrapedMetrics, error) {
if len(targets) == 0 {
return nil, fmt.Errorf("no targets to scrape")
}

// Merged result
// Result type for collecting scrape results from goroutines
type scrapeResult struct {
target *ScrapeTarget
metrics *ScrapedMetrics
err error
}

startTime := time.Now()
results := make(chan scrapeResult, len(targets))

// Scrape all targets in parallel
for _, target := range targets {
go func(t *ScrapeTarget) {
metrics, err := ks.scrapeTarget(ctx, t)
results <- scrapeResult{target: t, metrics: metrics, err: err}
}(target)
}

// Collect results and merge families
mergedFamilies := make(map[string]*MetricFamily)
var firstEndpoint string
var totalDuration time.Duration
startTime := time.Now()
var lastErr error

for _, target := range targets {
metrics, err := ks.scrapeTarget(ctx, target)
if err != nil {
lastErr = err
for i := 0; i < len(targets); i++ {
result := <-results
if result.err != nil {
lastErr = result.err
continue // Skip failed targets but continue with others
}

if firstEndpoint == "" {
firstEndpoint = metrics.Endpoint
firstEndpoint = result.metrics.Endpoint
}
totalDuration += metrics.ScrapeDuration
totalDuration += result.metrics.ScrapeDuration

// Merge families, adding node label to each time series
for name, family := range metrics.Families {
for name, family := range result.metrics.Families {
// Add node label to each time series in this family
for _, ts := range family.TimeSeries {
// Add node label if this is a node-based target
if target.NodeName != "" {
if result.target.NodeName != "" {
ts.Labels = append(ts.Labels, labels.Label{
Name: "node",
Value: target.NodeName,
Value: result.target.NodeName,
})
}
}
Expand Down Expand Up @@ -340,17 +357,22 @@ func (ks *KubernetesScraper) scrapeComponentPeriodically(ctx context.Context, co

// scrapeTarget scrapes metrics from a single target using RESTClient
func (ks *KubernetesScraper) scrapeTarget(ctx context.Context, target *ScrapeTarget) (*ScrapedMetrics, error) {
// Add per-request timeout to prevent indefinite blocking on slow/unresponsive nodes
reqCtx, cancel := context.WithTimeout(ctx, ks.config.Timeout)
defer cancel()

startTime := time.Now()

var result rest.Result
var endpoint string

// Build the appropriate RESTClient request based on target type
// Use reqCtx (with timeout) for all requests to prevent indefinite blocking
switch target.Component {
case ComponentAPIServer:
// API server metrics via direct path
endpoint = "/metrics"
result = ks.restClient.Get().AbsPath("/metrics").Do(ctx)
result = ks.restClient.Get().AbsPath("/metrics").Do(reqCtx)

case ComponentKubelet, ComponentCAdvisor:
// Node-based components via node proxy
Expand All @@ -360,7 +382,7 @@ func (ks *KubernetesScraper) scrapeTarget(ctx context.Context, target *ScrapeTar
Name(target.NodeName).
SubResource("proxy").
Suffix(target.Path).
Do(ctx)
Do(reqCtx)

case ComponentEtcd, ComponentScheduler, ComponentControllerManager, ComponentKubeProxy:
// Pod-based components via pod proxy
Expand All @@ -372,7 +394,7 @@ func (ks *KubernetesScraper) scrapeTarget(ctx context.Context, target *ScrapeTar
Name(podNameWithPort).
SubResource("proxy").
Suffix(target.Path).
Do(ctx)
Do(reqCtx)

default:
return nil, fmt.Errorf("unsupported component type: %s", target.Component)
Expand Down