Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
fd13879
feat: Introduce computation runner, log forwarder, ingress, and egres…
SammyOina Dec 17, 2025
fd26a90
feat: Update Go environment variable parsing and build system to use …
SammyOina Dec 17, 2025
800066f
feat: Update package sources to `sammyoina/cocos-ai` at a specific co…
SammyOina Dec 17, 2025
76cc6d8
chore: Update build system references to a specific commit and enhanc…
SammyOina Dec 17, 2025
9ea072c
build: Update package source repositories and versions, migrate clien…
SammyOina Dec 17, 2025
3feb2c5
debug stuck
SammyOina Dec 17, 2025
93940c3
debug
SammyOina Dec 17, 2025
eb418cd
debug
SammyOina Dec 17, 2025
0fbb988
feat: add HTTP/2 support to egress proxy and update build system to u…
SammyOina Dec 17, 2025
5869257
feat: enhance egress proxy CONNECT handling, update package sources, …
SammyOina Dec 18, 2025
ce8ca58
feat: Update build system for various services to a specific commit f…
SammyOina Dec 18, 2025
2af5775
feat: Migrate agent-internal gRPC communication to Unix sockets, set …
SammyOina Dec 18, 2025
15dd3fb
refactor: Remove standalone ingress-proxy systemd service and update …
SammyOina Dec 18, 2025
d92b805
fix: Prevent computation re-initialization in agent and update compon…
SammyOina Dec 18, 2025
569ae29
feat: update package versions and enable h2c support in ingress proxy.
SammyOina Dec 18, 2025
3729fd6
feat: refactor ingress proxy to support HTTP/2 over Unix sockets and …
SammyOina Dec 18, 2025
015f9a7
feat: Update build system package sources to `ultravioletrs/cocos` an…
SammyOina Dec 18, 2025
de41d51
refactor: improve error handling in proxy commands and remove unused …
SammyOina Dec 18, 2025
f28f44b
test: add mock service state return value in handleRunReqChunks test
SammyOina Dec 18, 2025
326a058
feat: add comprehensive tests for service and proxy components
SammyOina Dec 22, 2025
b9d8ad5
fix linter
SammyOina Dec 22, 2025
c221bfb
improve coverage
SammyOina Dec 22, 2025
954c4c5
test: add gRPC client and ingress adapter tests, and update egress pr…
SammyOina Dec 22, 2025
42f2ff0
improve coverage
SammyOina Dec 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .mockery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,10 @@ packages:
dir: '{{.InterfaceDir}}/mocks'
structname: '{{.InterfaceName}}'
filename: "{{.InterfaceName | lower}}.go"
github.com/ultravioletrs/cocos/pkg/clients/grpc/runner:
interfaces:
Client:
config:
dir: '{{.InterfaceDir}}/mocks'
structname: '{{.InterfaceName}}'
filename: "{{.InterfaceName | lower}}.go"
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
BUILD_DIR = build
SERVICES = manager agent cli attestation-service
SERVICES = manager agent cli attestation-service log-forwarder computation-runner egress-proxy ingress-proxy
ATTESTATION_POLICY = attestation_policy
CGO_ENABLED ?= 0
GOARCH ?= amd64
Expand Down Expand Up @@ -41,6 +41,8 @@ protoc:
protoc -I. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative agent/events/events.proto
protoc -I. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative agent/cvms/cvms.proto
protoc -I. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/proto/attestation/v1/attestation.proto
protoc -I. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative agent/log/log.proto
protoc -I. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative agent/runner/runner.proto

mocks:
mockery --config ./.mockery.yml
Expand Down
1 change: 0 additions & 1 deletion agent/computations.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
var _ fmt.Stringer = (*Datasets)(nil)

type AgentConfig struct {
Port string `json:"port,omitempty"`
CertFile string `json:"cert_file,omitempty"`
KeyFile string `json:"server_key,omitempty"`
ServerCAFile string `json:"server_ca_file,omitempty"`
Expand Down
13 changes: 6 additions & 7 deletions agent/computations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,15 @@ func TestDecompressToContext(t *testing.T) {
}

func TestAgentConfigJSON(t *testing.T) {
config := AgentConfig{
Port: "8080",
cfg := AgentConfig{
CertFile: "cert.pem",
KeyFile: "key.pem",
ServerCAFile: "server_ca.pem",
ClientCAFile: "client_ca.pem",
ServerCAFile: "server-ca.pem",
ClientCAFile: "client-ca.pem",
AttestedTls: true,
}

data, err := json.Marshal(config)
data, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Failed to marshal AgentConfig: %v", err)
}
Expand All @@ -125,7 +124,7 @@ func TestAgentConfigJSON(t *testing.T) {
t.Fatalf("Failed to unmarshal AgentConfig: %v", err)
}

if !reflect.DeepEqual(config, unmarshaledConfig) {
t.Errorf("Unmarshaled config does not match original. Got %+v, want %+v", unmarshaledConfig, config)
if !reflect.DeepEqual(cfg, unmarshaledConfig) {
t.Errorf("Unmarshaled config does not match original. Got %+v, want %+v", unmarshaledConfig, cfg)
}
}
41 changes: 39 additions & 2 deletions agent/cvms/api/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package grpc
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync"
"time"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/ultravioletrs/cocos/pkg/attestation"
"github.com/ultravioletrs/cocos/pkg/attestation/vtpm"
"github.com/ultravioletrs/cocos/pkg/clients/grpc"
"github.com/ultravioletrs/cocos/pkg/ingress"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -44,13 +46,14 @@ type CVMSClient struct {
logger *slog.Logger
runReqManager *runRequestManager
sp server.AgentServer
ingressProxy ingress.ProxyServer
storage storage.Storage
reconnectFn func(context.Context) (grpc.Client, cvms.Service_ProcessClient, error)
grpcClient grpc.Client
}

// NewClient returns new gRPC client instance.
func NewClient(stream cvms.Service_ProcessClient, svc agent.Service, messageQueue chan *cvms.ClientStreamMessage, logger *slog.Logger, sp server.AgentServer, storageDir string, reconnectFn func(context.Context) (grpc.Client, cvms.Service_ProcessClient, error), grpcClient grpc.Client) (*CVMSClient, error) {
func NewClient(stream cvms.Service_ProcessClient, svc agent.Service, messageQueue chan *cvms.ClientStreamMessage, logger *slog.Logger, sp server.AgentServer, ingressProxy ingress.ProxyServer, storageDir string, reconnectFn func(context.Context) (grpc.Client, cvms.Service_ProcessClient, error), grpcClient grpc.Client) (*CVMSClient, error) {
store, err := storage.NewFileStorage(storageDir)
if err != nil {
return nil, err
Expand All @@ -63,6 +66,7 @@ func NewClient(stream cvms.Service_ProcessClient, svc agent.Service, messageQueu
logger: logger,
runReqManager: newRunRequestManager(),
sp: sp,
ingressProxy: ingressProxy,
storage: store,
reconnectFn: reconnectFn,
grpcClient: grpcClient,
Expand Down Expand Up @@ -205,14 +209,17 @@ func (client *CVMSClient) handleAgentStateReq(mes *cvms.ServerStreamMessage_Agen
}

func (client *CVMSClient) handleRunReqChunks(ctx context.Context, msg *cvms.ServerStreamMessage_RunReqChunks) error {
client.logger.Debug("Received RunReq chunk", "id", msg.RunReqChunks.Id, "size", len(msg.RunReqChunks.Data), "isLast", msg.RunReqChunks.IsLast)
buffer, complete := client.runReqManager.addChunk(msg.RunReqChunks.Id, msg.RunReqChunks.Data, msg.RunReqChunks.IsLast)

if complete {
client.logger.Info("Received complete computation run request", "id", msg.RunReqChunks.Id, "totalSize", len(buffer))
var runReq cvms.ComputationRunReq
if err := proto.Unmarshal(buffer, &runReq); err != nil {
return errors.Wrap(err, errCorruptedManifest)
}

client.logger.Info("Starting computation execution", "computationId", runReq.Id, "name", runReq.Name)
go client.executeRun(ctx, &runReq)
}

Expand Down Expand Up @@ -246,6 +253,15 @@ func (client *CVMSClient) executeRun(ctx context.Context, runReq *cvms.Computati
})
}

// Check if the agent is in the correct state to initialize a new computation.
// If the agent is already processing this computation (e.g., after a reconnection),
// skip initialization to avoid state errors.
currentState := client.svc.State()
if currentState != "ReceivingManifest" {
client.logger.Info("Agent already processing computation, skipping initialization", "state", currentState, "computationId", runReq.Id)
return
}

if err := client.svc.InitComputation(ctx, ac); err != nil {
client.logger.Warn(err.Error())
return
Expand All @@ -267,7 +283,6 @@ func (client *CVMSClient) executeRun(ctx context.Context, runReq *cvms.Computati
}

if err := client.sp.Start(agent.AgentConfig{
Port: runReq.AgentConfig.Port,
CertFile: runReq.AgentConfig.CertFile,
KeyFile: runReq.AgentConfig.KeyFile,
ServerCAFile: runReq.AgentConfig.ServerCaFile,
Expand All @@ -278,6 +293,22 @@ func (client *CVMSClient) executeRun(ctx context.Context, runReq *cvms.Computati
runRes.RunRes.Error = err.Error()
}

// Start ingress proxy if available
if client.ingressProxy != nil {
if err := client.ingressProxy.Start(
ingress.AgentConfigToProxyConfig(agent.AgentConfig{
CertFile: runReq.AgentConfig.CertFile,
KeyFile: runReq.AgentConfig.KeyFile,
ServerCAFile: runReq.AgentConfig.ServerCaFile,
ClientCAFile: runReq.AgentConfig.ClientCaFile,
AttestedTls: runReq.AgentConfig.AttestedTls,
}),
ingress.ComputationToProxyContext(ac),
); err != nil {
client.logger.Warn(fmt.Sprintf("failed to start ingress proxy: %s", err.Error()))
}
}

defer func() {
if ccPlatform == attestation.Azure || ccPlatform == attestation.SNPvTPM {
cmpJson, err := json.Marshal(ac)
Expand Down Expand Up @@ -309,6 +340,12 @@ func (client *CVMSClient) handleStopComputation(ctx context.Context, mes *cvms.S
if err := client.sp.Stop(); err != nil {
msg.StopComputationRes.Message = err.Error()
}
// Stop ingress proxy if available
if client.ingressProxy != nil {
if err := client.ingressProxy.Stop(); err != nil {
client.logger.Warn(fmt.Sprintf("failed to stop ingress proxy: %s", err.Error()))
}
}
client.mu.Unlock()

client.sendMessage(&cvms.ClientStreamMessage{Message: msg})
Expand Down
Loading