diff --git a/server/job_rpc.go b/server/job_rpc.go index d474654236..b6f729ac9e 100644 --- a/server/job_rpc.go +++ b/server/job_rpc.go @@ -17,6 +17,7 @@ import ( "math/rand" "net/http" "net/url" + "slices" "strings" "time" @@ -43,6 +44,7 @@ const jobOrchSearchTimeoutDefault = 1 * time.Second const jobOrchSearchRespTimeoutDefault = 500 * time.Millisecond var errNoTimeoutSet = errors.New("no timeout_seconds set with request, timeout_seconds is required") +var sendJobReqWithTimeout = sendReqWithTimeout type JobSender struct { Addr string `json:"addr"` @@ -71,6 +73,15 @@ type JobRequest struct { orchSearchRespTimeout time.Duration } +type JobParameters struct { + Orchestrators JobOrchestratorsFilter `json:"orchestrators,omitempty"` //list of orchestrators to use for the job +} + +type JobOrchestratorsFilter struct { + Exclude []string `json:"exclude,omitempty"` + Include []string `json:"include,omitempty"` +} + // worker registers to Orchestrator func (h *lphttp) RegisterCapability(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { @@ -285,8 +296,15 @@ func (ls *LivepeerServer) submitJob(ctx context.Context, w http.ResponseWriter, jobReq.orchSearchTimeout = searchTimeout jobReq.orchSearchRespTimeout = respTimeout + var params JobParameters + if err := json.Unmarshal([]byte(jobReq.Parameters), ¶ms); err != nil { + clog.Errorf(ctx, "Unable to unmarshal job parameters err=%v", err) + http.Error(w, fmt.Sprintf("Unable to unmarshal job parameters err=%v", err), http.StatusBadRequest) + return + } + //get pool of Orchestrators that can do the job - orchs, err := getJobOrchestrators(ctx, ls.LivepeerNode, jobReq.Capability, jobReq.orchSearchTimeout, jobReq.orchSearchRespTimeout) + orchs, err := getJobOrchestrators(ctx, ls.LivepeerNode, jobReq.Capability, params, jobReq.orchSearchTimeout, jobReq.orchSearchRespTimeout) if err != nil { clog.Errorf(ctx, "Unable to find orchestrators for capability %v err=%v", jobReq.Capability, err) http.Error(w, fmt.Sprintf("Unable to find orchestrators for capability %v err=%v", jobReq.Capability, err), http.StatusBadRequest) @@ -365,7 +383,7 @@ func (ls *LivepeerServer) submitJob(ctx context.Context, w http.ResponseWriter, } start := time.Now() - resp, err := sendReqWithTimeout(req, time.Duration(jobReq.Timeout+5)*time.Second) //include 5 second buffer + resp, err := sendJobReqWithTimeout(req, time.Duration(jobReq.Timeout+5)*time.Second) //include 5 second buffer if err != nil { clog.Errorf(ctx, "job not able to be processed by Orchestrator %v err=%v ", orchToken.ServiceAddr, err.Error()) continue @@ -991,7 +1009,7 @@ func getOrchSearchTimeouts(ctx context.Context, searchTimeoutHdr, respTimeoutHdr return timeout, respTimeout } -func getJobOrchestrators(ctx context.Context, node *core.LivepeerNode, capability string, timeout time.Duration, respTimeout time.Duration) ([]JobToken, error) { +func getJobOrchestrators(ctx context.Context, node *core.LivepeerNode, capability string, params JobParameters, timeout time.Duration, respTimeout time.Duration) ([]JobToken, error) { orchs := node.OrchestratorPool.GetInfos() gateway := node.OrchestratorPool.Broadcaster() @@ -1019,7 +1037,7 @@ func getJobOrchestrators(ctx context.Context, node *core.LivepeerNode, capabilit return } - resp, err := sendReqWithTimeout(tokenReq, respTimeout) + resp, err := sendJobReqWithTimeout(tokenReq, respTimeout) if err != nil { clog.Errorf(ctx, "failed to get token from Orchestrator err=%v", err) errCh <- err @@ -1063,6 +1081,17 @@ func getJobOrchestrators(ctx context.Context, node *core.LivepeerNode, capabilit tokensCtx, cancel := context.WithTimeout(clog.Clone(context.Background(), ctx), timeout) // Shuffle and get job tokens for _, i := range rand.Perm(len(orchs)) { + //do not send to excluded Orchestrators + if slices.Contains(params.Orchestrators.Exclude, orchs[i].URL.String()) { + numAvailableOrchs-- + continue + } + //if include is set, only send to those Orchestrators + if len(params.Orchestrators.Include) > 0 && !slices.Contains(params.Orchestrators.Include, orchs[i].URL.String()) { + numAvailableOrchs-- + continue + } + go getOrchJobToken(ctx, orchs[i].URL, *reqSender, 500*time.Millisecond, tokenCh, errCh) } diff --git a/server/job_rpc_test.go b/server/job_rpc_test.go index 7169f9aa13..1db03b4274 100644 --- a/server/job_rpc_test.go +++ b/server/job_rpc_test.go @@ -12,15 +12,18 @@ import ( "net/http" "net/http/httptest" "net/url" + "slices" "testing" + "time" "github.com/ethereum/go-ethereum/accounts" - "github.com/ethereum/go-ethereum/common" ethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" ethcrypto "github.com/ethereum/go-ethereum/crypto" "github.com/livepeer/go-livepeer/ai/worker" + "github.com/livepeer/go-livepeer/common" "github.com/livepeer/go-livepeer/core" + "github.com/livepeer/go-livepeer/net" "github.com/livepeer/go-tools/drivers" "github.com/livepeer/lpms/stream" @@ -43,14 +46,14 @@ type mockJobOrchestrator struct { registerExternalCapability func(string) (*core.ExternalCapability, error) unregisterExternalCapability func(string) error - verifySignature func(common.Address, string, []byte) bool + verifySignature func(ethcommon.Address, string, []byte) bool checkExternalCapabilityCapacity func(string) bool reserveCapacity func(string) error getUrlForCapability func(string) string - balance func(common.Address, core.ManifestID) *big.Rat - debitFees func(common.Address, core.ManifestID, *net.PriceInfo, int64) + balance func(ethcommon.Address, core.ManifestID) *big.Rat + debitFees func(ethcommon.Address, core.ManifestID, *net.PriceInfo, int64) freeCapacity func(string) error - jobPriceInfo func(common.Address, string) (*net.PriceInfo, error) + jobPriceInfo func(ethcommon.Address, string) (*net.PriceInfo, error) ticketParams func(ethcommon.Address, *net.PriceInfo) (*net.TicketParams, error) } @@ -268,6 +271,67 @@ func newMockJobOrchestrator() *mockJobOrchestrator { return &mockJobOrchestrator{priv: pk, block: big.NewInt(5)} } +// stubJobOrchestratorPool is a stub implementation of the OrchestratorPool interface +type stubJobOrchestratorPool struct { + uris []*url.URL + infos []common.OrchestratorLocalInfo + node *core.LivepeerNode +} + +func newStubOrchestratorPool(node *core.LivepeerNode, uris []string) *stubJobOrchestratorPool { + var urlList []*url.URL + var infos []common.OrchestratorLocalInfo + for _, uri := range uris { + if u, err := url.Parse(uri); err == nil { + urlList = append(urlList, u) + infos = append(infos, common.OrchestratorLocalInfo{URL: u, Score: 1.0}) + } + } + return &stubJobOrchestratorPool{ + uris: urlList, + infos: infos, + node: mockJobLivepeerNode(), + } +} + +func (s *stubJobOrchestratorPool) GetInfos() []common.OrchestratorLocalInfo { + var infos []common.OrchestratorLocalInfo + for _, uri := range s.uris { + infos = append(infos, common.OrchestratorLocalInfo{URL: uri}) + } + return infos +} +func (s *stubJobOrchestratorPool) GetOrchestrators(ctx context.Context, max int, suspender common.Suspender, comparator common.CapabilityComparator, scorePred common.ScorePred) (common.OrchestratorDescriptors, error) { + var ods common.OrchestratorDescriptors + for _, uri := range s.uris { + ods = append(ods, common.OrchestratorDescriptor{ + LocalInfo: &common.OrchestratorLocalInfo{URL: uri, Score: 1.0}, + RemoteInfo: &net.OrchestratorInfo{ + Transcoder: uri.String(), + }, + }) + } + return ods, nil +} +func (s *stubJobOrchestratorPool) Size() int { + return len(s.uris) +} +func (s *stubJobOrchestratorPool) SizeWith(scorePred common.ScorePred) int { + if scorePred == nil { + return len(s.infos) + } + count := 0 + for _, info := range s.infos { + if scorePred(info.Score) { + count++ + } + } + return count +} +func (s *stubJobOrchestratorPool) Broadcaster() common.Broadcaster { + return core.NewBroadcaster(s.node) +} + func mockJobLivepeerNode() *core.LivepeerNode { node, _ := core.NewLivepeerNode(nil, "/tmp/thisdirisnotactuallyusedinthistest", nil) node.NodeType = core.OrchestratorNode @@ -495,7 +559,7 @@ func TestGetJobToken_MissingEthAddressHeader(t *testing.T) { } func TestGetJobToken_InvalidEthAddressHeader(t *testing.T) { - mockVerifySig := func(addr common.Address, msg string, sig []byte) bool { + mockVerifySig := func(addr ethcommon.Address, msg string, sig []byte) bool { return false } @@ -525,7 +589,7 @@ func TestGetJobToken_InvalidEthAddressHeader(t *testing.T) { } func TestGetJobToken_MissingCapabilityHeader(t *testing.T) { - mockVerifySig := func(addr common.Address, msg string, sig []byte) bool { + mockVerifySig := func(addr ethcommon.Address, msg string, sig []byte) bool { return true } mockJobOrch := newMockJobOrchestrator() @@ -554,7 +618,7 @@ func TestGetJobToken_MissingCapabilityHeader(t *testing.T) { } func TestGetJobToken_NoCapacity(t *testing.T) { - mockVerifySig := func(addr common.Address, msg string, sig []byte) bool { + mockVerifySig := func(addr ethcommon.Address, msg string, sig []byte) bool { return true } mockCheckExternalCapabilityCapacity := func(extCap string) bool { @@ -597,7 +661,7 @@ func TestGetJobToken_NoCapacity(t *testing.T) { } func TestGetJobToken_JobPriceInfoError(t *testing.T) { - mockVerifySig := func(addr common.Address, msg string, sig []byte) bool { + mockVerifySig := func(addr ethcommon.Address, msg string, sig []byte) bool { return true } @@ -605,7 +669,7 @@ func TestGetJobToken_JobPriceInfoError(t *testing.T) { return nil } - mockJobPriceInfo := func(addr common.Address, cap string) (*net.PriceInfo, error) { + mockJobPriceInfo := func(addr ethcommon.Address, cap string) (*net.PriceInfo, error) { return nil, errors.New("price error") } @@ -640,7 +704,7 @@ func TestGetJobToken_JobPriceInfoError(t *testing.T) { } func TestGetJobToken_InsufficientReserve(t *testing.T) { - mockVerifySig := func(addr common.Address, msg string, sig []byte) bool { + mockVerifySig := func(addr ethcommon.Address, msg string, sig []byte) bool { return true } @@ -648,7 +712,7 @@ func TestGetJobToken_InsufficientReserve(t *testing.T) { return nil } - mockJobPriceInfo := func(addr common.Address, cap string) (*net.PriceInfo, error) { + mockJobPriceInfo := func(addr ethcommon.Address, cap string) (*net.PriceInfo, error) { return nil, errors.New("insufficient sender reserve") } @@ -684,7 +748,7 @@ func TestGetJobToken_InsufficientReserve(t *testing.T) { } func TestGetJobToken_TicketParamsError(t *testing.T) { - mockVerifySig := func(addr common.Address, msg string, sig []byte) bool { + mockVerifySig := func(addr ethcommon.Address, msg string, sig []byte) bool { return true } @@ -692,14 +756,14 @@ func TestGetJobToken_TicketParamsError(t *testing.T) { return nil } - mockJobPriceInfo := func(addr common.Address, cap string) (*net.PriceInfo, error) { + mockJobPriceInfo := func(addr ethcommon.Address, cap string) (*net.PriceInfo, error) { return &net.PriceInfo{ PricePerUnit: 10, PixelsPerUnit: 1, }, nil } - mockTicketParams := func(addr common.Address, price *net.PriceInfo) (*net.TicketParams, error) { + mockTicketParams := func(addr ethcommon.Address, price *net.PriceInfo) (*net.TicketParams, error) { return nil, errors.New("ticket params error") } mockJobOrch := newMockJobOrchestrator() @@ -735,7 +799,7 @@ func TestGetJobToken_TicketParamsError(t *testing.T) { } func TestGetJobToken_Success(t *testing.T) { - mockVerifySig := func(addr common.Address, msg string, sig []byte) bool { + mockVerifySig := func(addr ethcommon.Address, msg string, sig []byte) bool { return true } @@ -743,16 +807,16 @@ func TestGetJobToken_Success(t *testing.T) { return nil } - mockJobPriceInfo := func(addr common.Address, cap string) (*net.PriceInfo, error) { + mockJobPriceInfo := func(addr ethcommon.Address, cap string) (*net.PriceInfo, error) { return &net.PriceInfo{ PricePerUnit: 10, PixelsPerUnit: 1, }, nil } - mockTicketParams := func(addr common.Address, price *net.PriceInfo) (*net.TicketParams, error) { + mockTicketParams := func(addr ethcommon.Address, price *net.PriceInfo) (*net.TicketParams, error) { return &net.TicketParams{ - Recipient: common.HexToAddress("0x1111111111111111111111111111111111111111").Bytes(), + Recipient: ethcommon.HexToAddress("0x1111111111111111111111111111111111111111").Bytes(), FaceValue: big.NewInt(1000).Bytes(), WinProb: big.NewInt(1).Bytes(), RecipientRandHash: []byte("hash"), @@ -761,7 +825,7 @@ func TestGetJobToken_Success(t *testing.T) { }, nil } - mockBalance := func(addr common.Address, manifestID core.ManifestID) *big.Rat { + mockBalance := func(addr ethcommon.Address, manifestID core.ManifestID) *big.Rat { return big.NewRat(1000, 1) } @@ -837,3 +901,142 @@ func TestSubmitJob_MethodNotAllowed(t *testing.T) { resp := w.Result() assert.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode) } + +func TestSubmitJob_OrchestratorSelectionParams(t *testing.T) { + // Create mock HTTP servers for orchestrators + mockServers := make([]*httptest.Server, 5) + orchURLs := make([]string, 5) + + // Create a handler that returns a valid job token + tokenHandler := func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/process/token" { + http.NotFound(w, r) + return + } + + token := &JobToken{ + ServiceAddr: "http://" + r.Host, // Use the server's host as the service address + SenderAddress: &JobSender{ + Addr: "0x1234567890abcdef1234567890abcdef123456", + Sig: "0x456", + }, + TicketParams: nil, + Price: &net.PriceInfo{ + PricePerUnit: 100, + PixelsPerUnit: 1, + }, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(token) + } + + // Start HTTP test servers + for i := 0; i < 5; i++ { + server := httptest.NewServer(http.HandlerFunc(tokenHandler)) + mockServers[i] = server + orchURLs[i] = server.URL + t.Logf("Mock server %d started at %s", i, orchURLs[i]) + } + + // Clean up servers when test completes + defer func() { + for _, server := range mockServers { + server.Close() + } + }() + + node := mockJobLivepeerNode() + pool := newStubOrchestratorPool(node, orchURLs) + node.OrchestratorPool = pool + + // Define test cases + testCases := []struct { + name string + include []string + exclude []string + expectedCount int + }{ + { + name: "No filtering", + include: []string{}, + exclude: []string{}, + expectedCount: 5, // All orchestrators + }, + { + name: "Include specific orchestrators", + include: []string{orchURLs[0], orchURLs[2]}, // First and third servers + exclude: []string{}, + expectedCount: 2, + }, + { + name: "Exclude specific orchestrators", + include: []string{}, + exclude: []string{orchURLs[1], orchURLs[3]}, // Second and fourth servers + expectedCount: 3, + }, + { + name: "Both include and exclude", + include: []string{orchURLs[0], orchURLs[1], orchURLs[2]}, // First three servers + exclude: []string{orchURLs[1]}, // Exclude second server + expectedCount: 2, // Should have first and third servers + }, + { + name: "Include non-existent orchestrators", + include: []string{"http://nonexistent.example.com"}, + exclude: []string{}, + expectedCount: 0, + }, + { + name: "Exclude all orchestrators", + include: []string{}, + exclude: orchURLs, // Exclude all servers + expectedCount: 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create JobParameters with the test case's filters + params := JobParameters{ + Orchestrators: JobOrchestratorsFilter{ + Include: tc.include, + Exclude: tc.exclude, + }, + } + + // Call getJobOrchestrators + tokens, err := getJobOrchestrators( + context.Background(), + node, + "test-capability", + params, + 100*time.Millisecond, // Short timeout for testing + 50*time.Millisecond, + ) + + if tc.expectedCount == 0 { + // If we expect no orchestrators, we should still get a nil error + // because the function should return an empty list, not an error + assert.NoError(t, err) + assert.Len(t, tokens, 0) + } else { + assert.NoError(t, err) + assert.Len(t, tokens, tc.expectedCount) + + if len(tc.include) > 0 { + for _, token := range tokens { + assert.True(t, slices.Contains(tc.include, token.ServiceAddr)) + } + } + + if len(tc.exclude) > 0 { + for _, token := range tokens { + assert.False(t, slices.Contains(tc.exclude, token.ServiceAddr)) + } + } + } + }) + } + +}