Skip to content

Commit 8ea97e4

Browse files
committed
More concurrency fixes
Signed-off-by: Babis Chalios <babis.chalios@e2b.dev>
1 parent c98e01e commit 8ea97e4

File tree

2 files changed

+49
-31
lines changed

2 files changed

+49
-31
lines changed

packages/orchestrator/internal/sandbox/uffd/userfaultfd/prefault.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ func (u *Userfaultfd) Prefault(ctx context.Context, offset int64, data []byte) e
2323
return fmt.Errorf("data length (%d) does not match pagesize (%d)", len(data), u.pageSize)
2424
}
2525

26-
return u.faultPage(ctx, nil, addr, offset, u.pageSize, directDataSource{data, int64(u.pageSize)}, nil, block.Prefetch)
26+
_, err = u.faultPage(ctx, addr, offset, u.pageSize, directDataSource{data, int64(u.pageSize)}, nil, block.Prefetch)
27+
28+
return err
2729
}
2830

2931
// directDataSource wraps a byte slice to implement block.Slicer for prefaulting.

packages/orchestrator/internal/sandbox/uffd/userfaultfd/userfaultfd.go

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,9 @@ func (u *Userfaultfd) Serve(
275275
u.pageTracker.markRemovedRange(start, end)
276276
}
277277

278-
for _, pfEvent := range pfEvents {
278+
for i := range pfEvents {
279+
pfEvent := pfEvents[i]
280+
279281
// We don't handle minor page faults
280282
if pfEvent.flags&UFFD_PAGEFAULT_FLAG_MINOR != 0 {
281283
return fmt.Errorf("unexpected MINOR pagefault event, closing UFFD")
@@ -307,10 +309,38 @@ func (u *Userfaultfd) Serve(
307309
case faulted:
308310
continue
309311
case removed:
310-
u.wg.Go(func() error { return u.zeroPage(&pfEvent, addr, u.pageSize) })
312+
u.wg.Go(func() error {
313+
handled, err := u.zeroPage(addr, u.pageSize)
314+
if err != nil {
315+
return err
316+
}
317+
318+
if !handled {
319+
// This happens when a remove event arrives in the UFFD file descriptor while
320+
// we are trying to copy()/zero() a page. We need to retry.
321+
u.deferredRWMutex.Lock()
322+
defer u.deferredRWMutex.Unlock()
323+
324+
u.deferredPfEvents = append(u.deferredPfEvents, pfEvent)
325+
}
326+
327+
return nil
328+
})
311329
case unfaulted:
312330
u.wg.Go(func() error {
313-
return u.faultPage(ctx, &pfEvent, addr, offset, u.pageSize, u.src, fdExit.SignalExit, accessType)
331+
handled, err := u.faultPage(ctx, addr, offset, u.pageSize, u.src, fdExit.SignalExit, accessType)
332+
if err != nil {
333+
return err
334+
}
335+
336+
if !handled {
337+
u.deferredRWMutex.Lock()
338+
defer u.deferredRWMutex.Unlock()
339+
340+
u.deferredPfEvents = append(u.deferredPfEvents, pfEvent)
341+
}
342+
343+
return nil
314344
})
315345
default:
316346
return fmt.Errorf("unexpected pageState: %#v", state)
@@ -340,10 +370,9 @@ func (u *Userfaultfd) PrefetchData() block.PrefetchData {
340370
}
341371

342372
func (u *Userfaultfd) zeroPage(
343-
pfEvent *UffdPagefault,
344373
addr uintptr,
345374
pagesize uintptr,
346-
) error {
375+
) (bool, error) {
347376
var err error
348377

349378
if pagesize == 4096 {
@@ -365,39 +394,31 @@ func (u *Userfaultfd) zeroPage(
365394
if errors.Is(err, unix.EEXIST) {
366395
u.pageTracker.markFaulted(addr)
367396

368-
return nil
397+
return true, nil
369398
}
370399

371400
if errors.Is(err, unix.EAGAIN) {
372-
// This happens when a remove event arrives in the UFFD file descriptor while
373-
// we are trying to copy()/zero() a page. We need to retry.
374-
u.deferredRWMutex.Lock()
375-
defer u.deferredRWMutex.Lock()
376-
377-
if pfEvent != nil {
378-
u.deferredPfEvents = append(u.deferredPfEvents, *pfEvent)
379-
}
380-
381-
return nil
401+
return false, nil
382402
}
383403

384-
if err != nil {
404+
if err == nil {
385405
u.pageTracker.markFaulted(addr)
406+
407+
return true, nil
386408
}
387409

388-
return err
410+
return false, err
389411
}
390412

391413
func (u *Userfaultfd) faultPage(
392414
ctx context.Context,
393-
pfEvent *UffdPagefault,
394415
addr uintptr,
395416
offset int64,
396417
pagesize uintptr,
397418
source block.Slicer,
398419
onFailure func() error,
399420
accessType block.AccessType,
400-
) error {
421+
) (bool, error) {
401422
span := trace.SpanFromContext(ctx)
402423

403424
// The RLock must be called inside the goroutine to ensure RUnlock runs via defer,
@@ -425,7 +446,7 @@ func (u *Userfaultfd) faultPage(
425446
span.RecordError(joinedErr)
426447
u.logger.Error(ctx, "UFFD serve data fetch error", zap.Error(joinedErr))
427448

428-
return fmt.Errorf("failed to read from source: %w", joinedErr)
449+
return false, fmt.Errorf("failed to read from source: %w", joinedErr)
429450
}
430451

431452
var copyMode CULong
@@ -443,18 +464,13 @@ func (u *Userfaultfd) faultPage(
443464
span.SetAttributes(attribute.Bool("uffd.already_mapped", true))
444465
u.pageTracker.markFaulted(addr)
445466

446-
return nil
467+
return true, nil
447468
}
448469

449470
if errors.Is(copyErr, unix.EAGAIN) {
450471
u.logger.Debug(ctx, "UFFD copy page EAGAIN. Ignoring", zap.Uintptr("addr", addr))
451472

452-
u.deferredRWMutex.Lock()
453-
defer u.deferredRWMutex.Unlock()
454-
455-
u.deferredPfEvents = append(u.deferredPfEvents, *pfEvent)
456-
457-
return nil
473+
return false, nil
458474
}
459475

460476
if copyErr != nil {
@@ -468,13 +484,13 @@ func (u *Userfaultfd) faultPage(
468484
span.RecordError(joinedErr)
469485
u.logger.Error(ctx, "UFFD serve uffdio copy error", zap.Error(joinedErr))
470486

471-
return fmt.Errorf("failed uffdio copy %w", joinedErr)
487+
return false, fmt.Errorf("failed uffdio copy %w", joinedErr)
472488
}
473489

474490
// Add the offset to the missing requests tracker with metadata.
475491
u.missingRequests.Add(offset)
476492
u.prefetchTracker.Add(offset, accessType)
477493
u.pageTracker.markFaulted(addr)
478494

479-
return nil
495+
return true, nil
480496
}

0 commit comments

Comments
 (0)