Skip to content

Commit bf04bd4

Browse files
committed
[YUNIKORN-2772] Scheduler restart does not preserve app start time (apache#1018)
Closes: apache#1018 Signed-off-by: Peter Bacsko <[email protected]>
1 parent c56c288 commit bf04bd4

File tree

7 files changed

+129
-23
lines changed

7 files changed

+129
-23
lines changed

pkg/scheduler/objects/application.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,9 @@ type StateLogEntry struct {
7777
}
7878

7979
type Application struct {
80-
ApplicationID string // application ID
81-
Partition string // partition Name
82-
SubmissionTime time.Time // time application was submitted
83-
tags map[string]string // application tags used in scheduling
80+
ApplicationID string // application ID
81+
Partition string // partition Name
82+
tags map[string]string // application tags used in scheduling
8483

8584
// Private mutable fields need protection
8685
queuePath string
@@ -91,6 +90,7 @@ type Application struct {
9190
sortedRequests sortedRequests // list of requests pre-sorted
9291
user security.UserGroup // owner of the application
9392
allocatedResource *resources.Resource // total allocated resources
93+
submissionTime time.Time // time application was submitted (based on the first ask)
9494

9595
usedResource *resources.TrackedResource // keep track of resource usage of the application
9696
preemptedResource *resources.TrackedResource // keep track of preempted resource usage of the application
@@ -128,7 +128,7 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eve
128128
app := &Application{
129129
ApplicationID: siApp.ApplicationID,
130130
Partition: siApp.PartitionName,
131-
SubmissionTime: time.Now(),
131+
submissionTime: time.Now(),
132132
queuePath: siApp.QueueName,
133133
tags: siApp.Tags,
134134
pending: resources.NewResource(),
@@ -175,7 +175,7 @@ func (sa *Application) String() string {
175175
return "application is nil"
176176
}
177177
return fmt.Sprintf("applicationID: %s, Partition: %s, SubmissionTime: %x, State: %s",
178-
sa.ApplicationID, sa.Partition, sa.SubmissionTime, sa.stateMachine.Current())
178+
sa.ApplicationID, sa.Partition, sa.GetSubmissionTime(), sa.stateMachine.Current())
179179
}
180180

181181
func (sa *Application) SetState(state string) {
@@ -597,6 +597,9 @@ func (sa *Application) AddAllocationAsk(ask *Allocation) error {
597597
if ask.IsAllocated() || resources.IsZero(ask.GetAllocatedResource()) {
598598
return fmt.Errorf("invalid ask added to app %s: %v", sa.ApplicationID, ask)
599599
}
600+
if ask.createTime.Before(sa.submissionTime) {
601+
sa.submissionTime = ask.createTime
602+
}
600603
delta := ask.GetAllocatedResource().Clone()
601604

602605
var oldAskResource *resources.Resource = nil
@@ -2099,7 +2102,7 @@ func (sa *Application) GetApplicationSummary(rmID string) *ApplicationSummary {
20992102
func (sa *Application) getApplicationSummary(rmID string) *ApplicationSummary {
21002103
return &ApplicationSummary{
21012104
ApplicationID: sa.ApplicationID,
2102-
SubmissionTime: sa.SubmissionTime,
2105+
SubmissionTime: sa.submissionTime,
21032106
StartTime: sa.startTime,
21042107
FinishTime: sa.finishedTime,
21052108
User: sa.user.User,
@@ -2262,3 +2265,9 @@ func (sa *Application) getResourceFromTags(tag string) *resources.Resource {
22622265

22632266
return resource
22642267
}
2268+
2269+
func (sa *Application) GetSubmissionTime() time.Time {
2270+
sa.RLock()
2271+
defer sa.RUnlock()
2272+
return sa.submissionTime
2273+
}

pkg/scheduler/objects/application_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3650,3 +3650,26 @@ func TestTryNodesNoReserve(t *testing.T) {
36503650
assert.Equal(t, result.ResultType, AllocatedReserved, "result type should be AllocatedReserved")
36513651
assert.Equal(t, result.ReservedNodeID, node1.NodeID, "reserved node should be node1")
36523652
}
3653+
3654+
func TestAppSubmissionTime(t *testing.T) {
3655+
app := newApplication(appID0, "default", "root.default")
3656+
queue, err := createRootQueue(map[string]string{"first": "5"})
3657+
assert.NilError(t, err, "queue create failed")
3658+
app.queue = queue
3659+
3660+
res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
3661+
ask1 := newAllocationAsk(aKey, appID1, res)
3662+
ask1.createTime = time.Unix(0, 100)
3663+
err = app.AddAllocationAsk(ask1)
3664+
assert.NilError(t, err)
3665+
ask2 := newAllocationAsk(aKey2, appID1, res)
3666+
ask2.createTime = time.Unix(0, 200)
3667+
err = app.AddAllocationAsk(ask2)
3668+
assert.NilError(t, err)
3669+
assert.Equal(t, app.submissionTime, time.Unix(0, 100), "app submission time is not set properly")
3670+
ask3 := newAllocationAsk(aKey3, appID1, res)
3671+
ask3.createTime = time.Unix(0, 50)
3672+
err = app.AddAllocationAsk(ask3)
3673+
assert.NilError(t, err)
3674+
assert.Equal(t, app.submissionTime, time.Unix(0, 50), "app submission time is not set properly")
3675+
}

pkg/scheduler/objects/sorters.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,12 @@ func sortApplicationsBySubmissionTimeAndPriority(sortedApps []*Application) {
150150
sort.SliceStable(sortedApps, func(i, j int) bool {
151151
l := sortedApps[i]
152152
r := sortedApps[j]
153-
if l.SubmissionTime.Before(r.SubmissionTime) {
153+
lSubTime := l.GetSubmissionTime()
154+
rSubTime := r.GetSubmissionTime()
155+
if lSubTime.Before(rSubTime) {
154156
return true
155157
}
156-
if r.SubmissionTime.Before(l.SubmissionTime) {
158+
if rSubTime.Before(lSubTime) {
157159
return false
158160
}
159161
return l.GetAskMaxPriority() > r.GetAskMaxPriority()
@@ -172,7 +174,7 @@ func sortApplicationsByPriorityAndSubmissionTime(sortedApps []*Application) {
172174
if leftPriority < rightPriority {
173175
return false
174176
}
175-
return l.SubmissionTime.Before(r.SubmissionTime)
177+
return l.GetSubmissionTime().Before(r.GetSubmissionTime())
176178
})
177179
}
178180

pkg/scheduler/objects/sorters_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,7 @@ func TestSortAppsFifo(t *testing.T) {
226226
app := newApplication(appID, "partition", "queue")
227227
app.pending = res
228228
input[appID] = app
229-
// make sure the time stamps differ at least a bit (tracking in nano seconds)
230-
time.Sleep(time.Nanosecond * 5)
229+
app.submissionTime = time.Unix(int64(i), 0)
231230
}
232231

233232
// fifo - apps should come back in order created 0, 1, 2, 3
@@ -236,8 +235,8 @@ func TestSortAppsFifo(t *testing.T) {
236235

237236
input["app-1"].askMaxPriority = 3
238237
input["app-3"].askMaxPriority = 5
239-
input["app-2"].SubmissionTime = input["app-3"].SubmissionTime
240-
input["app-1"].SubmissionTime = input["app-3"].SubmissionTime
238+
input["app-2"].submissionTime = input["app-3"].submissionTime
239+
input["app-1"].submissionTime = input["app-3"].submissionTime
241240
list = sortApplications(input, policies.FifoSortPolicy, false, nil)
242241
/*
243242
* apps order: 0, 3, 1, 2
@@ -410,7 +409,7 @@ func TestSortBySubmissionTime(t *testing.T) {
410409
app := newApplication(appID, "partition", "queue")
411410
app.pending = res
412411
input[appID] = app
413-
app.SubmissionTime = baseline.Add(-time.Minute * time.Duration(i))
412+
app.submissionTime = baseline.Add(-time.Minute * time.Duration(i))
414413
input[appID] = app
415414
}
416415

pkg/scheduler/partition_test.go

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
323323
assert.Equal(t, 1, partition.getPhAllocationCount(), "number of active placeholders")
324324

325325
// fake an ask that is used
326-
ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, false)
326+
ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, false, nil)
327327
err = app.AddAllocationAsk(ask)
328328
assert.NilError(t, err, "ask should be added to app")
329329
_, err = app.AllocateAsk(allocKey)
@@ -743,7 +743,7 @@ func TestRemoveNodeWithReplacement(t *testing.T) {
743743
assert.Equal(t, 2, partition.GetTotalNodeCount(), "node list was not updated as expected")
744744

745745
// fake an ask that is used
746-
ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, false)
746+
ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, false, nil)
747747
err = app.AddAllocationAsk(ask)
748748
assert.NilError(t, err, "ask should be added to app")
749749
_, err = app.AllocateAsk(allocKey)
@@ -817,7 +817,7 @@ func TestRemoveNodeWithReal(t *testing.T) {
817817
assert.Equal(t, 2, partition.GetTotalNodeCount(), "node list was not updated as expected")
818818

819819
// fake an ask that is used
820-
ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, false)
820+
ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, false, nil)
821821
err = app.AddAllocationAsk(ask)
822822
assert.NilError(t, err, "ask should be added to app")
823823
_, err = app.AllocateAsk(allocKey)
@@ -4800,3 +4800,75 @@ func TestForeignAllocation(t *testing.T) { //nolint:funlen
48004800
assert.Equal(t, 0, len(partition.foreignAllocs))
48014801
assert.Equal(t, 0, len(node.GetYunikornAllocations()))
48024802
}
4803+
4804+
func TestAppSchedulingOrderFIFO(t *testing.T) {
4805+
setupUGM()
4806+
partition, err := newBasePartition()
4807+
assert.NilError(t, err, "partition create failed")
4808+
conf := configs.PartitionConfig{
4809+
Name: "test",
4810+
Queues: []configs.QueueConfig{
4811+
{
4812+
Name: "root",
4813+
Parent: true,
4814+
SubmitACL: "*",
4815+
Queues: []configs.QueueConfig{
4816+
{
4817+
Name: "default",
4818+
Parent: false,
4819+
Properties: map[string]string{configs.ApplicationSortPolicy: policies.FifoSortPolicy.String()},
4820+
},
4821+
},
4822+
},
4823+
},
4824+
PlacementRules: nil,
4825+
Limits: nil,
4826+
NodeSortPolicy: configs.NodeSortingPolicy{},
4827+
}
4828+
err = partition.updatePartitionDetails(conf)
4829+
assert.NilError(t, err, "unable to update partition config")
4830+
4831+
nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
4832+
node := newNodeMaxResource(nodeID1, nodeRes)
4833+
err = partition.AddNode(node)
4834+
assert.NilError(t, err)
4835+
4836+
app1 := newApplication(appID1, "default", defQueue)
4837+
err = partition.AddApplication(app1)
4838+
assert.NilError(t, err, "could not add application")
4839+
app2 := newApplication(appID2, "default", defQueue)
4840+
err = partition.AddApplication(app2)
4841+
assert.NilError(t, err, "could not add application")
4842+
4843+
// add two asks to app2 first
4844+
app2AskRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2})
4845+
4846+
app2Ask1 := newAllocationAskAll(allocKey2, appID2, "", app2AskRes, 0, false, map[string]string{
4847+
siCommon.CreationTime: "100",
4848+
})
4849+
err = app2.AddAllocationAsk(app2Ask1)
4850+
assert.NilError(t, err, "could not add ask")
4851+
app2Ask2 := newAllocationAskAll(allocKey3, appID2, "", app2AskRes, 0, false, map[string]string{
4852+
siCommon.CreationTime: "50",
4853+
})
4854+
err = app2.AddAllocationAsk(app2Ask2)
4855+
assert.NilError(t, err, "could not add ask")
4856+
4857+
askRes1 := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
4858+
ask1 := newAllocationAskAll(allocKey, appID1, "", askRes1, 0, false, map[string]string{
4859+
siCommon.CreationTime: "1000",
4860+
})
4861+
err = app1.AddAllocationAsk(ask1)
4862+
assert.NilError(t, err, "could not add ask")
4863+
4864+
// the two asks from app2 should be scheduled
4865+
alloc := partition.tryAllocate()
4866+
assert.Assert(t, alloc != nil, "no allocation was made")
4867+
assert.Equal(t, allocKey3, alloc.Request.GetAllocationKey())
4868+
alloc = partition.tryAllocate()
4869+
assert.Assert(t, alloc != nil, "no allocation was made")
4870+
assert.Equal(t, allocKey2, alloc.Request.GetAllocationKey())
4871+
alloc = partition.tryAllocate()
4872+
assert.Assert(t, alloc != nil, "no allocation was made")
4873+
assert.Equal(t, allocKey, alloc.Request.GetAllocationKey())
4874+
}

pkg/scheduler/utilities_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -557,18 +557,18 @@ func newApplicationTGTagsWithPhTimeout(appID, partition, queueName string, task
557557
}
558558

559559
func newAllocationAskTG(allocKey, appID, taskGroup string, res *resources.Resource, placeHolder bool) *objects.Allocation {
560-
return newAllocationAskAll(allocKey, appID, taskGroup, res, 1, placeHolder)
560+
return newAllocationAskAll(allocKey, appID, taskGroup, res, 1, placeHolder, nil)
561561
}
562562

563563
func newAllocationAsk(allocKey, appID string, res *resources.Resource) *objects.Allocation {
564-
return newAllocationAskAll(allocKey, appID, "", res, 1, false)
564+
return newAllocationAskAll(allocKey, appID, "", res, 1, false, nil)
565565
}
566566

567567
func newAllocationAskPriority(allocKey, appID string, res *resources.Resource, prio int32) *objects.Allocation {
568-
return newAllocationAskAll(allocKey, appID, "", res, prio, false)
568+
return newAllocationAskAll(allocKey, appID, "", res, prio, false, nil)
569569
}
570570

571-
func newAllocationAskAll(allocKey, appID, taskGroup string, res *resources.Resource, prio int32, placeHolder bool) *objects.Allocation {
571+
func newAllocationAskAll(allocKey, appID, taskGroup string, res *resources.Resource, prio int32, placeHolder bool, tags map[string]string) *objects.Allocation {
572572
return objects.NewAllocationFromSI(&si.Allocation{
573573
AllocationKey: allocKey,
574574
ApplicationID: appID,
@@ -577,6 +577,7 @@ func newAllocationAskAll(allocKey, appID, taskGroup string, res *resources.Resou
577577
Priority: prio,
578578
TaskGroupName: taskGroup,
579579
Placeholder: placeHolder,
580+
AllocationTags: tags,
580581
})
581582
}
582583

pkg/webservice/handlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ func getApplicationDAO(app *objects.Application) *dao.ApplicationDAOInfo {
359359
PendingResource: app.GetPendingResource().DAOMap(),
360360
Partition: common.GetPartitionNameWithoutClusterID(app.Partition),
361361
QueueName: app.GetQueuePath(),
362-
SubmissionTime: app.SubmissionTime.UnixNano(),
362+
SubmissionTime: app.GetSubmissionTime().UnixNano(),
363363
FinishedTime: common.ZeroTimeInUnixNano(app.FinishedTime()),
364364
Requests: getAllocationAsksDAO(app.GetAllRequests()),
365365
Allocations: getAllocationsDAO(app.GetAllAllocations()),

0 commit comments

Comments
 (0)