Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions server/job_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"math/rand"
"net/http"
"net/url"
"slices"
"strings"
"time"

Expand All @@ -43,6 +44,7 @@ const jobOrchSearchTimeoutDefault = 1 * time.Second
const jobOrchSearchRespTimeoutDefault = 500 * time.Millisecond

var errNoTimeoutSet = errors.New("no timeout_seconds set with request, timeout_seconds is required")
var sendJobReqWithTimeout = sendReqWithTimeout

type JobSender struct {
Addr string `json:"addr"`
Expand Down Expand Up @@ -71,6 +73,15 @@ type JobRequest struct {
orchSearchRespTimeout time.Duration
}

type JobParameters struct {
Orchestrators JobOrchestratorsFilter `json:"orchestrators,omitempty"` //list of orchestrators to use for the job
}

type JobOrchestratorsFilter struct {
Exclude []string `json:"exclude,omitempty"`
Include []string `json:"include,omitempty"`
}

// worker registers to Orchestrator
func (h *lphttp) RegisterCapability(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
Expand Down Expand Up @@ -285,8 +296,15 @@ func (ls *LivepeerServer) submitJob(ctx context.Context, w http.ResponseWriter,
jobReq.orchSearchTimeout = searchTimeout
jobReq.orchSearchRespTimeout = respTimeout

var params JobParameters
if err := json.Unmarshal([]byte(jobReq.Parameters), &params); err != nil {
clog.Errorf(ctx, "Unable to unmarshal job parameters err=%v", err)
http.Error(w, fmt.Sprintf("Unable to unmarshal job parameters err=%v", err), http.StatusBadRequest)
return
}

//get pool of Orchestrators that can do the job
orchs, err := getJobOrchestrators(ctx, ls.LivepeerNode, jobReq.Capability, jobReq.orchSearchTimeout, jobReq.orchSearchRespTimeout)
orchs, err := getJobOrchestrators(ctx, ls.LivepeerNode, jobReq.Capability, params, jobReq.orchSearchTimeout, jobReq.orchSearchRespTimeout)
if err != nil {
clog.Errorf(ctx, "Unable to find orchestrators for capability %v err=%v", jobReq.Capability, err)
http.Error(w, fmt.Sprintf("Unable to find orchestrators for capability %v err=%v", jobReq.Capability, err), http.StatusBadRequest)
Expand Down Expand Up @@ -365,7 +383,7 @@ func (ls *LivepeerServer) submitJob(ctx context.Context, w http.ResponseWriter,
}

start := time.Now()
resp, err := sendReqWithTimeout(req, time.Duration(jobReq.Timeout+5)*time.Second) //include 5 second buffer
resp, err := sendJobReqWithTimeout(req, time.Duration(jobReq.Timeout+5)*time.Second) //include 5 second buffer
if err != nil {
clog.Errorf(ctx, "job not able to be processed by Orchestrator %v err=%v ", orchToken.ServiceAddr, err.Error())
continue
Expand Down Expand Up @@ -991,7 +1009,7 @@ func getOrchSearchTimeouts(ctx context.Context, searchTimeoutHdr, respTimeoutHdr
return timeout, respTimeout
}

func getJobOrchestrators(ctx context.Context, node *core.LivepeerNode, capability string, timeout time.Duration, respTimeout time.Duration) ([]JobToken, error) {
func getJobOrchestrators(ctx context.Context, node *core.LivepeerNode, capability string, params JobParameters, timeout time.Duration, respTimeout time.Duration) ([]JobToken, error) {
orchs := node.OrchestratorPool.GetInfos()
gateway := node.OrchestratorPool.Broadcaster()

Expand Down Expand Up @@ -1019,7 +1037,7 @@ func getJobOrchestrators(ctx context.Context, node *core.LivepeerNode, capabilit
return
}

resp, err := sendReqWithTimeout(tokenReq, respTimeout)
resp, err := sendJobReqWithTimeout(tokenReq, respTimeout)
if err != nil {
clog.Errorf(ctx, "failed to get token from Orchestrator err=%v", err)
errCh <- err
Expand Down Expand Up @@ -1063,6 +1081,17 @@ func getJobOrchestrators(ctx context.Context, node *core.LivepeerNode, capabilit
tokensCtx, cancel := context.WithTimeout(clog.Clone(context.Background(), ctx), timeout)
// Shuffle and get job tokens
for _, i := range rand.Perm(len(orchs)) {
//do not send to excluded Orchestrators
if slices.Contains(params.Orchestrators.Exclude, orchs[i].URL.String()) {
numAvailableOrchs--
continue
}
//if include is set, only send to those Orchestrators
if len(params.Orchestrators.Include) > 0 && !slices.Contains(params.Orchestrators.Include, orchs[i].URL.String()) {
numAvailableOrchs--
continue
}

go getOrchJobToken(ctx, orchs[i].URL, *reqSender, 500*time.Millisecond, tokenCh, errCh)
}

Expand Down
Loading
Loading