Skip to content

Commit d103a92

Browse files
committed
fix activity reporting
1 parent 99e9398 commit d103a92

File tree

6 files changed

+162
-43
lines changed

6 files changed

+162
-43
lines changed

README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,7 @@ This eliminates the need to hardcode deployment paths in your application comman
135135
### Process Management
136136
- `--conda-env` - Conda environment to activate before running command
137137
- `--workdir` - Working directory for the process
138-
- `--force-alive` - Force keep-alive to prevent idle culling (default: `true`)
139-
- `--no-force-alive` - Disable force keep-alive, report only real activity
138+
- `--keep-alive` - Always report activity to prevent idle culling (default: `false`)
140139

141140
### Git Repository
142141
- `--repo` - Git repository URL to clone before starting app

pkg/activity/tracker.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Package activity provides activity tracking for JupyterHub activity reporting
2+
package activity
3+
4+
import (
5+
"sync"
6+
"time"
7+
)
8+
9+
// Tracker records the last activity timestamp in a thread-safe manner
10+
// Used to track when the proxied application was last accessed for reporting to JupyterHub
11+
type Tracker struct {
12+
mu sync.RWMutex
13+
lastActivity *time.Time
14+
}
15+
16+
// NewTracker creates a new activity tracker
17+
func NewTracker() *Tracker {
18+
return &Tracker{}
19+
}
20+
21+
// RecordActivity records the current time as the last activity timestamp
22+
// This should be called on every HTTP request to the proxied application
23+
func (t *Tracker) RecordActivity() {
24+
now := time.Now().UTC()
25+
t.mu.Lock()
26+
t.lastActivity = &now
27+
t.mu.Unlock()
28+
}
29+
30+
// GetLastActivity returns the last recorded activity timestamp
31+
// Returns nil if no activity has been recorded yet
32+
func (t *Tracker) GetLastActivity() *time.Time {
33+
t.mu.RLock()
34+
defer t.mu.RUnlock()
35+
return t.lastActivity
36+
}

pkg/config/config.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ type Config struct {
1818
Command []string
1919
DestPort int
2020
CondaEnv string
21-
WorkDir string
22-
ForceAlive bool
21+
WorkDir string
22+
KeepAlive bool
2323
StripPrefix bool // Strip service prefix before forwarding (default: true for most apps)
2424

2525
// Git
@@ -86,12 +86,8 @@ Framework-agnostic - works with any web application (Streamlit, Voila, Panel, et
8686
"Conda environment to activate")
8787
rootCmd.Flags().StringVar(&cfg.WorkDir, "workdir", "",
8888
"Working directory for the process")
89-
rootCmd.Flags().BoolVar(&cfg.ForceAlive, "force-alive", true,
90-
"Force keep-alive (prevent idle culling)")
91-
92-
// Legacy compatibility flag that sets force-alive to false
93-
rootCmd.Flags().BoolVar(&cfg.ForceAlive, "no-force-alive", false,
94-
"Disable force keep-alive (report only real activity)")
89+
rootCmd.Flags().BoolVar(&cfg.KeepAlive, "keep-alive", false,
90+
"Always report activity to prevent idle culling (default: false, report actual activity)")
9591

9692
// Prefix handling (default: strip prefix like jhsingle-native-proxy)
9793
rootCmd.Flags().BoolVar(&cfg.StripPrefix, "strip-prefix", true,

pkg/hub/client.go

Lines changed: 82 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"os"
1212
"time"
1313

14+
"github.com/nebari-dev/jhub-app-proxy/pkg/activity"
1415
"github.com/nebari-dev/jhub-app-proxy/pkg/logger"
1516
)
1617

@@ -142,23 +143,79 @@ func (c *Client) NotifyActivity(ctx context.Context) error {
142143
return nil
143144
}
144145

146+
// NotifyActivityWithTime notifies JupyterHub of activity with a specific timestamp
147+
// This is used when keepAlive=false to report actual last activity time
148+
func (c *Client) NotifyActivityWithTime(ctx context.Context, timestamp time.Time) error {
149+
endpoint := fmt.Sprintf("%s/users/%s/activity", c.baseURL, c.username)
150+
151+
payload := ActivityPayload{
152+
LastActivity: timestamp,
153+
}
154+
155+
// Include server-specific activity if server name is set
156+
if c.servername != "" {
157+
payload.Servers = map[string]ServerActivity{
158+
c.servername: {
159+
LastActivity: timestamp,
160+
},
161+
}
162+
}
163+
164+
jsonData, err := json.Marshal(payload)
165+
if err != nil {
166+
return fmt.Errorf("failed to marshal activity payload: %w", err)
167+
}
168+
169+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewBuffer(jsonData))
170+
if err != nil {
171+
return fmt.Errorf("failed to create request: %w", err)
172+
}
173+
174+
req.Header.Set("Authorization", fmt.Sprintf("token %s", c.apiToken))
175+
req.Header.Set("Content-Type", "application/json")
176+
177+
start := time.Now()
178+
resp, err := c.httpClient.Do(req)
179+
duration := time.Since(start)
180+
181+
if err != nil {
182+
c.logger.HubAPICall("POST", endpoint, 0, duration, err)
183+
return fmt.Errorf("failed to notify activity: %w", err)
184+
}
185+
defer resp.Body.Close()
186+
187+
c.logger.HubAPICall("POST", endpoint, resp.StatusCode, duration, nil)
188+
189+
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
190+
body, _ := io.ReadAll(resp.Body)
191+
return fmt.Errorf("activity notification failed with status %d: %s",
192+
resp.StatusCode, string(body))
193+
}
194+
195+
c.logger.Debug("activity notification successful", "timestamp", timestamp)
196+
return nil
197+
}
198+
145199
// StartActivityReporter starts a background goroutine that periodically reports activity
146200
// Returns a cancel function to stop the reporter
147-
func (c *Client) StartActivityReporter(ctx context.Context, interval time.Duration, forceAlive bool) context.CancelFunc {
201+
//
202+
// If keepAlive is true: Always report current time (prevent idle culling)
203+
// If keepAlive is false: Only report when there's actual activity tracked by activityTracker
204+
func (c *Client) StartActivityReporter(ctx context.Context, interval time.Duration, keepAlive bool, activityTracker *activity.Tracker) context.CancelFunc {
148205
ctx, cancel := context.WithCancel(ctx)
149206

150207
go func() {
151208
c.logger.Info("starting activity reporter",
152209
"interval", interval,
153-
"force_alive", forceAlive,
210+
"keep_alive", keepAlive,
154211
"username", c.username,
155212
"servername", c.servername)
156213

157214
ticker := time.NewTicker(interval)
158215
defer ticker.Stop()
159216

160-
// Report activity immediately on start if force_alive is enabled
161-
if forceAlive {
217+
// Report activity immediately on start if keepAlive is enabled
218+
if keepAlive {
162219
if err := c.NotifyActivity(ctx); err != nil {
163220
c.logger.Error("failed to notify activity on start", err)
164221
}
@@ -170,13 +227,27 @@ func (c *Client) StartActivityReporter(ctx context.Context, interval time.Durati
170227
c.logger.Info("activity reporter stopped")
171228
return
172229
case <-ticker.C:
173-
// In force_alive mode, always report activity
174-
// In normal mode, only report if there was actual activity
175-
// (for now, we always report - activity tracking can be added later)
176-
if err := c.NotifyActivity(ctx); err != nil {
177-
c.logger.Error("failed to notify activity", err,
178-
"username", c.username,
179-
"servername", c.servername)
230+
if keepAlive {
231+
// Always report current time (keep alive forever)
232+
if err := c.NotifyActivity(ctx); err != nil {
233+
c.logger.Error("failed to notify activity", err,
234+
"username", c.username,
235+
"servername", c.servername)
236+
}
237+
} else {
238+
// Only report if there was actual activity
239+
lastActivity := activityTracker.GetLastActivity()
240+
if lastActivity != nil {
241+
if err := c.NotifyActivityWithTime(ctx, *lastActivity); err != nil {
242+
c.logger.Error("failed to notify activity", err,
243+
"username", c.username,
244+
"servername", c.servername,
245+
"last_activity", lastActivity)
246+
}
247+
} else {
248+
// No activity yet, don't send notification
249+
c.logger.Debug("no activity to report yet")
250+
}
180251
}
181252
}
182253
}

pkg/router/router.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"net/http"
66
"strings"
77

8+
"github.com/nebari-dev/jhub-app-proxy/pkg/activity"
89
"github.com/nebari-dev/jhub-app-proxy/pkg/interim"
910
"github.com/nebari-dev/jhub-app-proxy/pkg/logger"
1011
"github.com/nebari-dev/jhub-app-proxy/pkg/process"
@@ -23,6 +24,7 @@ type Router struct {
2324
appRootPath string
2425
subprocessURL string
2526
oauthCallbackPath string // Empty if OAuth disabled for jhub-app-proxy
27+
activityTracker *activity.Tracker
2628
}
2729

2830
// Config contains configuration for the router
@@ -37,6 +39,7 @@ type Config struct {
3739
AppRootPath string
3840
SubprocessURL string
3941
OAuthCallbackPath string // Empty if OAuth disabled for jhub-app-proxy
42+
ActivityTracker *activity.Tracker
4043
}
4144

4245
// New creates a new router with the given configuration
@@ -52,6 +55,7 @@ func New(cfg Config) *Router {
5255
appRootPath: cfg.AppRootPath,
5356
subprocessURL: cfg.SubprocessURL,
5457
oauthCallbackPath: cfg.OAuthCallbackPath,
58+
activityTracker: cfg.ActivityTracker,
5559
}
5660
}
5761

@@ -147,5 +151,11 @@ func (rtr *Router) handleAppRunning(w http.ResponseWriter, r *http.Request, path
147151
"path", path,
148152
"backend_url", rtr.subprocessURL,
149153
"app_status", "running")
154+
155+
// Record activity for JupyterHub activity reporting
156+
if rtr.activityTracker != nil {
157+
rtr.activityTracker.RecordActivity()
158+
}
159+
150160
rtr.proxyHandler.ServeHTTP(w, r)
151161
}

pkg/server/server.go

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"syscall"
1212
"time"
1313

14+
"github.com/nebari-dev/jhub-app-proxy/pkg/activity"
1415
"github.com/nebari-dev/jhub-app-proxy/pkg/api"
1516
"github.com/nebari-dev/jhub-app-proxy/pkg/auth"
1617
"github.com/nebari-dev/jhub-app-proxy/pkg/config"
@@ -24,15 +25,16 @@ import (
2425

2526
// Server represents the HTTP server and its components
2627
type Server struct {
27-
httpServer *http.Server
28-
manager *process.ManagerWithLogs
29-
interimHandler *interim.Handler
30-
router *router.Router
31-
logger *logger.Logger
32-
config *config.Config
33-
proxyPort int
34-
subprocessPort int
35-
interimPath string
28+
httpServer *http.Server
29+
manager *process.ManagerWithLogs
30+
interimHandler *interim.Handler
31+
router *router.Router
32+
logger *logger.Logger
33+
config *config.Config
34+
proxyPort int
35+
subprocessPort int
36+
interimPath string
37+
activityTracker *activity.Tracker
3638
}
3739

3840
// Config contains all dependencies needed to create a server
@@ -143,6 +145,9 @@ func New(cfg Config) (*Server, error) {
143145
return nil, fmt.Errorf("failed to create proxy handler: %w", err)
144146
}
145147

148+
// Create activity tracker for JupyterHub activity reporting
149+
activityTracker := activity.NewTracker()
150+
146151
// Create main router
147152
mainRouter := router.New(router.Config{
148153
Logger: log,
@@ -155,6 +160,7 @@ func New(cfg Config) (*Server, error) {
155160
AppRootPath: appRootPath,
156161
SubprocessURL: cfg.SubprocessURL,
157162
OAuthCallbackPath: oauthCallbackPath, // Empty if OAuth disabled
163+
ActivityTracker: activityTracker,
158164
})
159165

160166
// Create HTTP server
@@ -164,15 +170,16 @@ func New(cfg Config) (*Server, error) {
164170
}
165171

166172
return &Server{
167-
httpServer: httpServer,
168-
manager: cfg.Manager,
169-
interimHandler: interimHandler,
170-
router: mainRouter,
171-
logger: log,
172-
config: cfg.AppConfig,
173-
proxyPort: cfg.ProxyPort,
174-
subprocessPort: cfg.SubprocessPort,
175-
interimPath: interimBasePath,
173+
httpServer: httpServer,
174+
manager: cfg.Manager,
175+
interimHandler: interimHandler,
176+
router: mainRouter,
177+
logger: log,
178+
config: cfg.AppConfig,
179+
proxyPort: cfg.ProxyPort,
180+
subprocessPort: cfg.SubprocessPort,
181+
interimPath: interimBasePath,
182+
activityTracker: activityTracker,
176183
}, nil
177184
}
178185

@@ -216,7 +223,7 @@ func (s *Server) StartSubprocess(ctx context.Context, cmd []string) {
216223
s.interimHandler.MarkAppDeployed()
217224

218225
if s.config.AuthType == "oauth" {
219-
if err := startActivityReporter(ctx, s.config, s.logger); err != nil {
226+
if err := startActivityReporter(ctx, s.config, s.logger, s.activityTracker); err != nil {
220227
s.logger.Warn("failed to start activity reporter (continuing anyway)", "error", err)
221228
}
222229
}
@@ -270,7 +277,7 @@ func GetServicePrefix(log *logger.Logger) string {
270277
}
271278

272279
// startActivityReporter starts the JupyterHub activity reporter
273-
func startActivityReporter(ctx context.Context, cfg *config.Config, log *logger.Logger) error {
280+
func startActivityReporter(ctx context.Context, cfg *config.Config, log *logger.Logger, activityTracker *activity.Tracker) error {
274281
hubClient, err := hub.NewClientFromEnv(log)
275282
if err != nil {
276283
return fmt.Errorf("failed to create hub client: %w", err)
@@ -281,11 +288,11 @@ func startActivityReporter(ctx context.Context, cfg *config.Config, log *logger.
281288
}
282289

283290
interval := 5 * time.Minute
284-
_ = hubClient.StartActivityReporter(ctx, interval, cfg.ForceAlive)
291+
_ = hubClient.StartActivityReporter(ctx, interval, cfg.KeepAlive, activityTracker)
285292

286293
log.Info("activity reporter started",
287294
"interval", interval,
288-
"force_alive", cfg.ForceAlive)
295+
"keep_alive", cfg.KeepAlive)
289296

290297
return nil
291298
}

0 commit comments

Comments
 (0)