Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a1fac53
feat(remote_signer, orchestrator) Add external capabilities support t…
eliteprox Feb 3, 2026
ec501ab
Refactor ExternalCapabilityInfo: Remove PriceInfo field and related m…
eliteprox Feb 4, 2026
4a889ef
Implement capabilities endpoint and refactor ExternalCapabilityInfo
eliteprox Feb 4, 2026
85268d8
revert changes to gRPC
eliteprox Feb 4, 2026
df9275f
feat(capabilities): Add BYOC external capability and integrate pricin…
eliteprox Feb 11, 2026
18472ac
refactor(capabilities): cleanup unnecessary capability changes
eliteprox Feb 11, 2026
4d47903
Merge branch 'master' into feat/remote-signer-byoc
eliteprox Feb 11, 2026
deae93b
Merge branch 'master' into feat/remote-signer-byoc
eliteprox Feb 11, 2026
c500fc5
Merge branch 'master' into feat/remote-signer-byoc
eliteprox Mar 2, 2026
7287b04
apply gofmt
eliteprox Mar 2, 2026
cf9ca75
Enhance BYOC capability validation in remote signer
eliteprox Mar 3, 2026
9884e79
Refactor PriceInfo methods to simplify node checks and enhance BYOC p…
eliteprox Mar 3, 2026
a78e876
Merge branch 'master' into feat/remote-signer-byoc
eliteprox Mar 27, 2026
947825a
Enhance BYOC job signing and payment processing
eliteprox Apr 3, 2026
5f3d6be
Refactor job credential verification in BYOC orchestrator
eliteprox Apr 3, 2026
b08d680
Merge branch 'master' into feat/remote-signer-byoc
eliteprox Apr 3, 2026
ec6bd32
revert erroneous changes to pricing logic in orchestrator.go, fix tests
eliteprox Apr 3, 2026
55ecbbc
revert removed comments
eliteprox Apr 3, 2026
a6b94a8
Update test for GetOrchestrator to assert empty capabilities prices i…
eliteprox Apr 3, 2026
a19eb98
Refactor pricing logic in orchestrator.go to remove unnecessary recip…
eliteprox Apr 3, 2026
5342094
revert changes to byoc/stream_orchestrator.go
eliteprox Apr 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions byoc/job_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,10 @@ func (bso *BYOCOrchestratorServer) confirmPayment(ctx context.Context, sender et
orchBal, pmtErr := bso.processPayment(ctx, sender, capability, paymentHdr)
if pmtErr != nil {
//log if there are payment errors but continue, balance will runout and clean up
if paymentHdr != "" {
clog.Errorf(ctx, "rejecting request: payment header present but invalid: %v", pmtErr)
return errPaymentError
}
clog.Infof(ctx, "job payment error: %v", pmtErr)
}

Expand All @@ -502,25 +506,22 @@ func (bso *BYOCOrchestratorServer) confirmPayment(ctx context.Context, sender et
return nil
}

// process payment and return balance
// processPayment decodes and applies the payment header if present.
// Always returns a non-nil balance so callers can safely compare.
func (bso *BYOCOrchestratorServer) processPayment(ctx context.Context, sender ethcommon.Address, capability string, paymentHdr string) (*big.Rat, error) {
if paymentHdr != "" {
payment, err := getPayment(paymentHdr)
if err != nil {
clog.Errorf(ctx, "job payment invalid: %v", err)
return nil, errPaymentError
return bso.getPaymentBalance(sender, capability), errPaymentError
}

if err := bso.orch.ProcessPayment(ctx, payment, core.ManifestID(capability)); err != nil {
bso.orch.FreeExternalCapabilityCapacity(capability)
clog.Errorf(ctx, "Error processing payment: %v", err)
return nil, errPaymentError
return bso.getPaymentBalance(sender, capability), errPaymentError
}
}
orchBal := bso.getPaymentBalance(sender, capability)

return orchBal, nil

return bso.getPaymentBalance(sender, capability), nil
}

func (bso *BYOCOrchestratorServer) chargeForCompute(start time.Time, price *net.PriceInfo, sender ethcommon.Address, jobId string) {
Expand Down Expand Up @@ -566,18 +567,26 @@ func (bso *BYOCOrchestratorServer) verifyJobCreds(ctx context.Context, jobCreds
return nil, errSegSig
}

if !bso.orch.VerifySig(ethcommon.HexToAddress(jobData.Sender), jobData.Request+jobData.Parameters, sigByte) {
clog.Errorf(ctx, "Sig check failed sender=%v", jobData.Sender)
return nil, errSegSig
}
sender := ethcommon.HexToAddress(jobData.Sender)

if reserveCapacity && bso.orch.ReserveExternalCapabilityCapacity(jobData.Capability) != nil {
return nil, errZeroCapacity
// Verify V1 structured binary format
v1Payload := FlattenBYOCJob(&BYOCJobSigningInput{
ID: jobData.ID,
Capability: jobData.Capability,
Request: jobData.Request,
Parameters: jobData.Parameters,
TimeoutSeconds: jobData.Timeout,
})
if bso.orch.VerifySig(sender, string(v1Payload), sigByte) {
if reserveCapacity && bso.orch.ReserveExternalCapabilityCapacity(jobData.Capability) != nil {
return nil, errZeroCapacity
}
jobData.CapabilityUrl = bso.orch.GetUrlForCapability(jobData.Capability)
return jobData, nil
}

jobData.CapabilityUrl = bso.orch.GetUrlForCapability(jobData.Capability)

return jobData, nil
clog.Errorf(ctx, "Sig check failed sender=%v", jobData.Sender)
return nil, errSegSig
}

func (bso *BYOCOrchestratorServer) verifyTokenCreds(ctx context.Context, tokenCreds string) (*JobSender, error) {
Expand Down
64 changes: 64 additions & 0 deletions byoc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package byoc
import (
"context"
"crypto/tls"
"encoding/binary"
"errors"
"math/big"
gonet "net"
Expand Down Expand Up @@ -279,3 +280,66 @@ type byocLiveRequestParams struct {
// when the write for the last segment started
lastSegmentTime time.Time
}

// BYOCJobSigV1Prefix is the 16-byte domain separator for BYOC job signatures (V1).
// Prevents cross-protocol signature replay.
const BYOCJobSigV1Prefix = "LP_BYOC_JOB_V1\x00\x00"

// BYOCJobSigningInput holds the fields that are bound into a BYOC job signature.
type BYOCJobSigningInput struct {
ID string
Capability string
Request string
Parameters string
TimeoutSeconds int
}

// FlattenBYOCJob produces a deterministic binary representation of a BYOC job
// for signing, similar to SegTranscodingMetadata.Flatten() used by LV2V.
//
// Wire format:
//
// version(16) || timeout(4,BE) || len(id)(4,BE) || id || len(cap)(4,BE) || cap
// || len(req)(4,BE) || req || len(params)(4,BE) || params
func FlattenBYOCJob(job *BYOCJobSigningInput) []byte {
idBytes := []byte(job.ID)
capBytes := []byte(job.Capability)
reqBytes := []byte(job.Request)
paramsBytes := []byte(job.Parameters)

size := 16 + 4 +
4 + len(idBytes) +
4 + len(capBytes) +
4 + len(reqBytes) +
4 + len(paramsBytes)

buf := make([]byte, size)
offset := 0

copy(buf[offset:], []byte(BYOCJobSigV1Prefix))
offset += 16

binary.BigEndian.PutUint32(buf[offset:], uint32(job.TimeoutSeconds))
offset += 4

binary.BigEndian.PutUint32(buf[offset:], uint32(len(idBytes)))
offset += 4
copy(buf[offset:], idBytes)
offset += len(idBytes)

binary.BigEndian.PutUint32(buf[offset:], uint32(len(capBytes)))
offset += 4
copy(buf[offset:], capBytes)
offset += len(capBytes)

binary.BigEndian.PutUint32(buf[offset:], uint32(len(reqBytes)))
offset += 4
copy(buf[offset:], reqBytes)
offset += len(reqBytes)

binary.BigEndian.PutUint32(buf[offset:], uint32(len(paramsBytes)))
offset += 4
copy(buf[offset:], paramsBytes)

return buf
}
11 changes: 10 additions & 1 deletion byoc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,16 @@ var sendJobReqWithTimeout = sendReqWithTimeout
func (g *gatewayJob) sign() error {
//sign the request
gateway := g.node.OrchestratorPool.Broadcaster()
sig, err := gateway.Sign([]byte(g.Job.Req.Request + g.Job.Req.Parameters))

sigPayload := FlattenBYOCJob(&BYOCJobSigningInput{
ID: g.Job.Req.ID,
Capability: g.Job.Req.Capability,
Request: g.Job.Req.Request,
Parameters: g.Job.Req.Parameters,
TimeoutSeconds: g.Job.Req.Timeout,
})

sig, err := gateway.Sign(sigPayload)
if err != nil {
return errors.New(fmt.Sprintf("Unable to sign request err=%v", err))
}
Expand Down
17 changes: 13 additions & 4 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (orch *orchestrator) PriceInfoForCaps(sender ethcommon.Address, manifestID
// priceInfo returns price per pixel as a fixed point number wrapped in a big.Rat
func (orch *orchestrator) priceInfo(sender ethcommon.Address, manifestID ManifestID, caps *net.Capabilities) (*big.Rat, error) {
// If there is already a fixed price for the given session, use this price
if manifestID != "" {
if manifestID != "" && orch.node.Balances != nil {
if balances, ok := orch.node.Balances.balances[sender]; ok {
fixedPrice := balances.FixedPrice(manifestID)
if fixedPrice != nil {
Expand Down Expand Up @@ -412,9 +412,18 @@ func (orch *orchestrator) priceInfo(sender ethcommon.Address, manifestID Manifes
continue
}
for modelID := range constraints.Models {
price := orch.node.GetBasePriceForCap(sender.String(), Capability(cap), modelID)
if price == nil {
price = orch.node.GetBasePriceForCap("default", Capability(cap), modelID)
var price *big.Rat
if Capability(cap) == Capability_BYOC {
// BYOC prices are stored in jobPriceInfo, keyed by capability name
price = orch.node.GetPriceForJob(sender.String(), modelID)
if price == nil || price.Sign() == 0 {
price = orch.node.GetPriceForJob("default", modelID)
}
} else {
price = orch.node.GetBasePriceForCap(sender.String(), Capability(cap), modelID)
if price == nil {
price = orch.node.GetBasePriceForCap("default", Capability(cap), modelID)
}
}

if price != nil {
Expand Down
Loading
Loading