diff --git a/byoc/job_orchestrator_test.go b/byoc/job_orchestrator_test.go index 6e9c799a60..9dbe8fd1fd 100644 --- a/byoc/job_orchestrator_test.go +++ b/byoc/job_orchestrator_test.go @@ -96,6 +96,16 @@ func (r *mockJobOrchestrator) Sign(msg []byte) ([]byte, error) { func (r *mockJobOrchestrator) ExtraNodes() int { return r.extraNodes } +func (r *mockJobOrchestrator) OrchInfoSig() []byte { + if r.node != nil && len(r.node.InfoSig) > 0 { + return r.node.InfoSig + } + sig, err := r.Sign([]byte(r.Address().Hex())) + if err != nil { + return nil + } + return sig +} func (r *mockJobOrchestrator) VerifySig(addr ethcommon.Address, msg string, sig []byte) bool { return r.verifySignature(addr, msg, sig) } diff --git a/cmd/livepeer/starter/flags.go b/cmd/livepeer/starter/flags.go index 37df7ce5d1..edf514f09c 100644 --- a/cmd/livepeer/starter/flags.go +++ b/cmd/livepeer/starter/flags.go @@ -138,6 +138,8 @@ func NewLivepeerConfig(fs *flag.FlagSet) LivepeerConfig { // flags cfg.TestOrchAvail = fs.Bool("startupAvailabilityCheck", *cfg.TestOrchAvail, "Set to false to disable the startup Orchestrator availability check on the configured serviceAddr") + cfg.RemoteSigner = fs.Bool("remoteSigner", *cfg.RemoteSigner, "Set to true to run remote signer service") + cfg.RemoteSignerUrl = fs.String("remoteSignerUrl", *cfg.RemoteSignerUrl, "URL of remote signer service to use (e.g., http://localhost:8935). Gateway only.") // Gateway metrics cfg.KafkaBootstrapServers = fs.String("kafkaBootstrapServers", *cfg.KafkaBootstrapServers, "URL of Kafka Bootstrap Servers") diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index 0b15400810..91f67cf69b 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -167,6 +167,8 @@ type LivepeerConfig struct { OrchBlacklist *string OrchMinLivepeerVersion *string TestOrchAvail *bool + RemoteSigner *bool + RemoteSignerUrl *string AIRunnerImage *string AIRunnerImageOverrides *string AIVerboseLogs *bool @@ -302,6 +304,8 @@ func DefaultLivepeerConfig() LivepeerConfig { // Flags defaultTestOrchAvail := true + defaultRemoteSigner := false + defaultRemoteSignerUrl := "" // Gateway logs defaultKafkaBootstrapServers := "" @@ -422,7 +426,9 @@ func DefaultLivepeerConfig() LivepeerConfig { OrchMinLivepeerVersion: &defaultMinLivepeerVersion, // Flags - TestOrchAvail: &defaultTestOrchAvail, + TestOrchAvail: &defaultTestOrchAvail, + RemoteSigner: &defaultRemoteSigner, + RemoteSignerUrl: &defaultRemoteSignerUrl, // Gateway logs KafkaBootstrapServers: &defaultKafkaBootstrapServers, @@ -679,8 +685,17 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { } } + // Validate remote signer mode + if *cfg.RemoteSigner { + if *cfg.Network == "offchain" { + exit("Remote signer mode requires on-chain network") + } + } + if *cfg.Redeemer { n.NodeType = core.RedeemerNode + } else if *cfg.RemoteSigner { + n.NodeType = core.RemoteSignerNode } else if *cfg.Orchestrator { n.NodeType = core.OrchestratorNode if !*cfg.Transcoder { @@ -1567,6 +1582,40 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { } bcast := core.NewBroadcaster(n) + + // Populate infoSig with remote signer if configured + if *cfg.RemoteSignerUrl != "" { + url, err := url.Parse(*cfg.RemoteSignerUrl) + if err != nil { + glog.Exit("Invalid remote signer URL: ", err) + } + if url.Scheme == "" || url.Host == "" { + // Usually something like `host:port` or just plain `host` + // Prepend https:// for convenience + url, err = url.Parse("https://" + *cfg.RemoteSignerUrl) + if err != nil { + glog.Exit("Adding HTTPS to remote signer URL failed: ", err) + } + } + + glog.Info("Retrieving OrchestratorInfo fields from remote signer: ", url) + fields, err := server.GetOrchInfoSig(url) + if err != nil { + glog.Exit("Unable to query remote signer: ", err) + } + n.RemoteSignerUrl = url + n.RemoteEthAddr = ethcommon.BytesToAddress(fields.Address) + n.InfoSig = fields.Signature + glog.Info("Using Ethereum address from remote signer: ", n.RemoteEthAddr) + } else { + // Use local signing + infoSig, err := bcast.Sign([]byte(fmt.Sprintf("%v", bcast.Address().Hex()))) + if err != nil { + glog.Exit("Unable to generate info sig: ", err) + } + n.InfoSig = infoSig + } + orchBlacklist := parseOrchBlacklist(cfg.OrchBlacklist) if *cfg.OrchPerfStatsURL != "" && *cfg.Region != "" { glog.Infof("Using Performance Stats, region=%s, URL=%s, minPerfScore=%v", *cfg.Region, *cfg.OrchPerfStatsURL, *cfg.MinPerfScore) @@ -1793,6 +1842,18 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { }() } + // Start remote signer server if in remote signer mode + if n.NodeType == core.RemoteSignerNode { + go func() { + *cfg.HttpAddr = defaultAddr(*cfg.HttpAddr, "127.0.0.1", OrchestratorRpcPort) + glog.Info("Starting remote signer server on ", *cfg.HttpAddr) + err := server.StartRemoteSignerServer(s, *cfg.HttpAddr) + if err != nil { + exit("Error starting remote signer server: err=%q", err) + } + }() + } + go func() { if core.OrchestratorNode != n.NodeType { return diff --git a/common/types.go b/common/types.go index d5ea3da795..d6a879cf5b 100644 --- a/common/types.go +++ b/common/types.go @@ -45,6 +45,7 @@ type NodeStatus struct { type Broadcaster interface { Address() ethcommon.Address Sign([]byte) ([]byte, error) + OrchInfoSig() []byte ExtraNodes() int } diff --git a/core/broadcaster.go b/core/broadcaster.go index d89c103505..bb9bcca5ca 100644 --- a/core/broadcaster.go +++ b/core/broadcaster.go @@ -18,11 +18,23 @@ func (bcast *broadcaster) Sign(msg []byte) ([]byte, error) { return bcast.node.Eth.Sign(crypto.Keccak256(msg)) } func (bcast *broadcaster) Address() ethcommon.Address { - if bcast.node == nil || bcast.node.Eth == nil { + if bcast.node == nil { + return ethcommon.Address{} + } + if (bcast.node.RemoteEthAddr != ethcommon.Address{}) { + return bcast.node.RemoteEthAddr + } + if bcast.node.Eth == nil { return ethcommon.Address{} } return bcast.node.Eth.Account().Address } +func (bcast *broadcaster) OrchInfoSig() []byte { + if bcast == nil || bcast.node == nil { + return nil + } + return bcast.node.InfoSig +} func (bcast *broadcaster) ExtraNodes() int { if bcast == nil || bcast.node == nil { return 0 diff --git a/core/livepeernode.go b/core/livepeernode.go index b7aa51b39d..9b27c38eca 100644 --- a/core/livepeernode.go +++ b/core/livepeernode.go @@ -26,6 +26,8 @@ import ( "github.com/livepeer/go-livepeer/common" "github.com/livepeer/go-livepeer/eth" lpmon "github.com/livepeer/go-livepeer/monitor" + + ethcommon "github.com/ethereum/go-ethereum/common" ) var ErrTranscoderAvail = errors.New("ErrTranscoderUnavailable") @@ -48,6 +50,7 @@ const ( TranscoderNode RedeemerNode AIWorkerNode + RemoteSignerNode ) var nodeTypeStrs = map[NodeType]string{ @@ -57,6 +60,7 @@ var nodeTypeStrs = map[NodeType]string{ TranscoderNode: "transcoder", RedeemerNode: "redeemer", AIWorkerNode: "aiworker", + RemoteSignerNode: "remotesigner", } func (t NodeType) String() string { @@ -144,6 +148,11 @@ type LivepeerNode struct { Sender pm.Sender ExtraNodes int + // Gateway fields for remote signers + RemoteSignerUrl *url.URL + RemoteEthAddr ethcommon.Address // eth address of the remote signer + InfoSig []byte // sig over eth address for the OrchestratorInfo request + // Thread safety for config fields mu sync.RWMutex StorageConfigs map[string]*transcodeConfig diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index 7301ae9731..7b38bd49ad 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -36,6 +36,7 @@ type stubBroadcaster struct{} func (s *stubBroadcaster) Sign(msg []byte) ([]byte, error) { return []byte{}, nil } func (s *stubBroadcaster) Address() ethcommon.Address { return ethcommon.Address{} } func (s *stubBroadcaster) ExtraNodes() int { return 0 } +func (s *stubBroadcaster) OrchInfoSig() []byte { return nil } func TestNewDBOrchestratorPoolCache_NilEthClient_ReturnsError(t *testing.T) { assert := assert.New(t) diff --git a/doc/remote-signer.md b/doc/remote-signer.md new file mode 100644 index 0000000000..b4e38db285 --- /dev/null +++ b/doc/remote-signer.md @@ -0,0 +1,96 @@ +# Remote signer + +The **remote signer** is a standalone `go-livepeer` node mode that separates **Ethereum key custody + signing** from the gateway’s **untrusted media handling**. It is intended to: + +- Improve security posture by removing Ethereum hot keys from the media processing path +- Enable web3-less gateway implementations natively on additional platforms such as browser, mobile, serverless and embedded backend apps +- Enable third-party payment operators to manage crypto payments separately from those managing media operations. + +## Current implementation status + +Remote signing was designed to initially target **Live AI** (`live-video-to-video`). + +Support for other workloads may be added in the future. + +The on-chain service registry is not used for Live AI workloads right now, so orchestrator discovery is not implemented as part of the remote signer. The gateway can learn about orchestrators via an orchestrator webhook (`-orchWebhookUrl`) or a static list (`-orchAddr`). + +This allows a gateway to run in offchain mode while still working with on-chain orchestrators. + +## Architecture + +At a high level, the gateway uses the remote signer to handle Ethereum-related operations such as generating signatures or probabilistic micropayment tickets: + +```mermaid +sequenceDiagram + participant RemoteSigner as RemoteSigner + participant Gateway as Gateway + participant Orchestrator as Orchestrator + + Gateway->>RemoteSigner: POST /sign-orchestrator-info + RemoteSigner-->>Gateway: {address, signature} + Gateway->>Orchestrator: GetOrchestratorInfo(Address=address,Sig=signature) + Orchestrator-->>Gateway: OrchestratorInfo (incl TicketParams) + + Note over Gateway,RemoteSigner: Live AI payments (asynchronous) + Gateway->>RemoteSigner: POST /generate-live-payment (orchInfo + signerState) + RemoteSigner-->>Gateway: {payment, segCreds, signerState'} + Gateway->>Orchestrator: POST /payment (headers: Livepeer-Payment, Livepeer-Segment) + Orchestrator-->>Gateway: PaymentResult (incl updated OrchestratorInfo) +``` + +## Usage + +### Remote signer node + +Start a remote signer by enabling the mode flag: + +- `-remoteSigner=true`: run the remote signer service + +The remote signer is intended to be its own standalone node type. The `-remoteSigner` flag cannot be combined with other mode flags such as `-gateway`, `-orchestrator`, `-transcoder`, etc. + +**The remote signer requires an on-chain network**. It cannot run with `-network=offchain` because it must have on-chain Ethereum connectivity to sign and manage payment tickets. + +The remote signer must have typical Ethereum flags configured (examples: `-network`, `-ethUrl`, `-ethController`, keystore/password flags). See the go-livepeer [devtool](https://github.com/livepeer/go-livepeer/blob/92bdb59f169056e3d1beba9b511554ea5d9eda72/cmd/devtool/devtool.go#L200-L212) for an example of what flags might be required. + +The remote signer listens to the standard go-livepeer HTTP port (8935) by default. To change the listening port or interface, use the `-httpAddr` flag. + +Example (fill in the placeholders for your environment): + +```bash +./livepeer \ + -remoteSigner \ + -network mainnet \ + -httpAddr 127.0.0.1:7936 \ + -ethUrl \ + -ethPassword + ... +``` + +### Gateway node + +Configure a gateway to use a remote signer with: + +- `-remoteSignerUrl `: base URL of the remote signer service (**gateway only**) + +If `-remoteSignerUrl` is set, the gateway will query the signer at startup and fail fast if it cannot reach the signer. + +**No Ethereum flags are necessary on the gateway** in this mode. Omit the `-network` flag entirely here; this makes the gateway run in offchain mode, but it will still be able to send work to on-chain orchestrators with the `-remoteSignerUrl` flag enabled. + +By default, if no URL scheme is provided, https is assumed and prepended to the remote signer URL. To override this (eg, to use a http:// URL) then include the scheme, eg `-remoteSignerUrl http://signer-host:port` + +Example: + +```bash +./livepeer \ + -gateway \ + -httpAddr :9935 \ + -remoteSignerUrl http://127.0.0.1:7936 \ + -orchAddr localhost:8935 \ + -v 6 +``` + +## Operational + security guidance + +For the moment, remote signers are intended to sit behind infrastructure controls rather than being exposed directly to end-users. For example, run the remote signer on a private network or behind an authenticated proxy. Do not expose the remote signer to unauthenticated end-users. Run the remote signer close to gateways on a private network; protect it like you would an internal wallet service. + +Remote signers are stateless, so signer nodes can operate in a redundant configuration (eg, round-robin DNS, anycasting) with no special gateway-side configuration. diff --git a/server/remote_signer.go b/server/remote_signer.go new file mode 100644 index 0000000000..c3b8304349 --- /dev/null +++ b/server/remote_signer.go @@ -0,0 +1,141 @@ +package server + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "github.com/golang/glog" + "github.com/livepeer/go-livepeer/clog" + "github.com/livepeer/go-livepeer/core" +) + +// SignOrchestratorInfo handles signing GetOrchestratorInfo requests for multiple orchestrators +func (ls *LivepeerServer) SignOrchestratorInfo(w http.ResponseWriter, r *http.Request) { + ctx := clog.AddVal(r.Context(), "request_id", string(core.RandomManifestID())) + remoteAddr := getRemoteAddr(r) + clog.Info(ctx, "Orch info signature request", "ip", remoteAddr) + + // Get the broadcaster (signer) + // In remote signer mode, we may not have an OrchestratorPool, so create a broadcaster directly + gw := core.NewBroadcaster(ls.LivepeerNode) + + // Create empty params for signing + params := GetOrchestratorInfoParams{} + + // Generate the request (this creates the signature) + req, err := genOrchestratorReq(gw, params) + if err != nil { + clog.Errorf(ctx, "Failed to generate request: err=%q", err) + respondJsonError(ctx, w, err, http.StatusInternalServerError) + return + } + + // Extract signature and format as hex + var ( + signature = "0x" + hex.EncodeToString(req.Sig) + address = gw.Address().String() + ) + + results := map[string]string{ + "address": address, + "signature": signature, + } + + // Return JSON response + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(results) +} + +// StartRemoteSignerServer starts the HTTP server for remote signer mode +func StartRemoteSignerServer(ls *LivepeerServer, bind string) error { + // Register the remote signer endpoint + ls.HTTPMux.Handle("POST /sign-orchestrator-info", http.HandlerFunc(ls.SignOrchestratorInfo)) + + // Start the HTTP server + glog.Info("Starting Remote Signer server on ", bind) + gw := core.NewBroadcaster(ls.LivepeerNode) + sig, err := gw.Sign([]byte(fmt.Sprintf("%v", gw.Address().Hex()))) + if err != nil { + return err + } + ls.LivepeerNode.InfoSig = sig + srv := http.Server{ + Addr: bind, + Handler: ls.HTTPMux, + IdleTimeout: HTTPIdleTimeout, + } + return srv.ListenAndServe() +} + +// HexBytes represents a byte slice that marshals/unmarshals as hex with 0x prefix +type HexBytes []byte + +func (h HexBytes) MarshalJSON() ([]byte, error) { + hexStr := "0x" + hex.EncodeToString(h) + return json.Marshal(hexStr) +} + +func (h *HexBytes) UnmarshalJSON(data []byte) error { + var hexStr string + if err := json.Unmarshal(data, &hexStr); err != nil { + return err + } + + // Remove 0x prefix if present + if len(hexStr) >= 2 && hexStr[:2] == "0x" { + hexStr = hexStr[2:] + } + + // Decode hex string to bytes + decoded, err := hex.DecodeString(hexStr) + if err != nil { + return fmt.Errorf("invalid hex string: %v", err) + } + + *h = decoded + return nil +} + +// OrchInfoSigResponse represents the response from the remote signer +type OrchInfoSigResponse struct { + Address HexBytes `json:"address"` + Signature HexBytes `json:"signature"` +} + +// Gateway helper that calls the remote signer service for the GetOrchestratorInfo signature +func GetOrchInfoSig(remoteSignerHost *url.URL) (*OrchInfoSigResponse, error) { + + url := remoteSignerHost.ResolveReference(&url.URL{Path: "/sign-orchestrator-info"}) + + // Create HTTP client with timeout + client := &http.Client{ + Timeout: 30 * time.Second, + } + + // Make the request + resp, err := client.Post(url.String(), "application/json", nil) + if err != nil { + return nil, fmt.Errorf("failed to call remote signer: %w", err) + } + defer resp.Body.Close() + + // Check response status + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("remote signer returned status %d: %s", resp.StatusCode, string(body)) + } + + // Parse response + var signerResp OrchInfoSigResponse + if err := json.NewDecoder(resp.Body).Decode(&signerResp); err != nil { + return nil, fmt.Errorf("failed to parse remote signer response: %w", err) + } + + return &signerResp, nil +} diff --git a/server/rpc.go b/server/rpc.go index 411b97b2fd..da335b98aa 100644 --- a/server/rpc.go +++ b/server/rpc.go @@ -362,11 +362,7 @@ func startOrchestratorClient(ctx context.Context, uri *url.URL) (net.Orchestrato } func genOrchestratorReq(b common.Broadcaster, params GetOrchestratorInfoParams) (*net.OrchestratorRequest, error) { - sig, err := b.Sign([]byte(fmt.Sprintf("%v", b.Address().Hex()))) - if err != nil { - return nil, err - } - return &net.OrchestratorRequest{Address: b.Address().Bytes(), Sig: sig, Capabilities: params.Caps, IgnoreCapacityCheck: params.IgnoreCapacityCheck}, nil + return &net.OrchestratorRequest{Address: b.Address().Bytes(), Sig: b.OrchInfoSig(), Capabilities: params.Caps, IgnoreCapacityCheck: params.IgnoreCapacityCheck}, nil } func genEndSessionRequest(sess *BroadcastSession) (*net.EndTranscodingSessionRequest, error) { diff --git a/server/rpc_test.go b/server/rpc_test.go index 4268e35665..f885d52764 100644 --- a/server/rpc_test.go +++ b/server/rpc_test.go @@ -284,6 +284,10 @@ func (r *stubOrchestrator) GetUrlForCapability(capability string) string { func (r *stubOrchestrator) ExtraNodes() int { return 0 } +func (r *stubOrchestrator) OrchInfoSig() []byte { + b, _ := r.Sign([]byte(r.Address().Hex())) + return b +} func stubBroadcaster2() *stubOrchestrator { return newStubOrchestrator() // lazy; leverage subtyping for interface commonalities @@ -323,13 +327,6 @@ func TestRPCTranscoderReq(t *testing.T) { t.Errorf("Expected %v; got %v", o.sessCapErr, err) } o.sessCapErr = nil - - // error signing - b.signErr = fmt.Errorf("Signing error") - _, err = genOrchestratorReq(b, GetOrchestratorInfoParams{}) - if err == nil { - t.Error("Did not expect to generate a orchestrator request with invalid address") - } } func TestRPCSeg(t *testing.T) { diff --git a/test_args.sh b/test_args.sh index d3e83597a9..0e3df3df02 100755 --- a/test_args.sh +++ b/test_args.sh @@ -216,6 +216,15 @@ res=0 $TMPDIR/livepeer -gateway -verifierUrl http\\://host/ || res=$? [ $res -ne 0 ] +# Check remote signer URL handling +$TMPDIR/livepeer -gateway -remoteSignerUrl abc:65535 2>&1 | grep -e "Retrieving OrchestratorInfo fields from remote signer: https://abc:65535" + +$TMPDIR/livepeer -gateway -remoteSignerUrl http://127.0.0.1:65535 2>&1 | grep -e "Retrieving OrchestratorInfo fields from remote signer: http://127.0.0.1:65535" + +$TMPDIR/livepeer -gateway -remoteSignerUrl "http://[::1" 2>&1 | grep -e "Invalid remote signer URL" + +$TMPDIR/livepeer -gateway -remoteSignerUrl abc:def 2>&1 | grep -e 'Adding HTTPS to remote signer URL failed: parse "https://abc:def": invalid port ":def"' + # Check that verifier shared path is required $TMPDIR/livepeer -gateway -verifierUrl http://host 2>&1 | grep "Requires a path to the"