Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion diode-server/cmd/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ func main() {
}
defer dbPool.Close()

var graphRepo reconciler.GraphRepository

repository := postgres.NewRepository(dbPool)

// Initialize GraphBuilder if graph DB feature is enabled
Expand All @@ -158,6 +160,7 @@ func main() {

// Use SQLC-generated queries directly for graph operations
graphQueries := dbpostgres.New(dbPool)
graphRepo = graphQueries

// Create GraphBuilder with graph queries
graphBuilder := reconciler.NewGraphBuilder(graphQueries, s.Logger())
Expand Down Expand Up @@ -222,7 +225,7 @@ func main() {
}

authorizer := authutil.NewContextAuthorizer(s.Logger())
gRPCServer, err := reconciler.NewServer(ctx, s.Logger(), repository, serverInterceptors(authorizer, s.Logger())...)
gRPCServer, err := reconciler.NewServer(ctx, s.Logger(), repository, graphRepo, serverInterceptors(authorizer, s.Logger())...)
if err != nil {
s.Logger().Error("failed to instantiate gRPC server", "error", err)
metricRecorder.RecordServiceStartupAttempt(ctx, false)
Expand Down
21 changes: 20 additions & 1 deletion diode-server/dbstore/postgres/queries/graph.sql
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,23 @@ LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset');
UPDATE graph_nodes
SET metadata = graph_nodes.metadata || $3, updated_at = NOW()
WHERE node_type = $1 AND external_id = $2
RETURNING id, external_id, node_type, data, duplicate_count, matching_schema_version, created_at, updated_at, last_seen_ts, metadata, content_hash;
RETURNING id, external_id, node_type, data, duplicate_count, matching_schema_version, created_at, updated_at, last_seen_ts, metadata, content_hash;

-- name: ListGraphNodes :many
-- Lists graph nodes with optional filtering by types, metadata, and timestamp range.
-- All filters are optional - pass NULL/empty to skip filtering.
-- node_types: pass NULL or empty array to list all types
-- metadata_filter: pass NULL to skip metadata filtering (uses GIN index when provided)
-- ts_start/ts_end: pass NULL to skip timestamp range filtering
SELECT id, external_id, node_type, data, duplicate_count, matching_schema_version, created_at, updated_at, last_seen_ts, metadata, content_hash
FROM graph_nodes
WHERE
-- Filter by node types (optional)
(sqlc.narg('node_types')::text[] IS NULL OR array_length(sqlc.narg('node_types')::text[], 1) IS NULL OR node_type = ANY(sqlc.narg('node_types')::text[]))
-- Filter by metadata (optional)
AND (sqlc.narg('metadata_filter')::jsonb IS NULL OR metadata @> sqlc.narg('metadata_filter')::jsonb)
-- Filter by timestamp range (optional)
AND (sqlc.narg('ts_start')::timestamptz IS NULL OR last_seen_ts >= sqlc.narg('ts_start')::timestamptz)
AND (sqlc.narg('ts_end')::timestamptz IS NULL OR last_seen_ts <= sqlc.narg('ts_end')::timestamptz)
ORDER BY last_seen_ts DESC, id
LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset');
68 changes: 68 additions & 0 deletions diode-server/gen/dbstore/postgres/graph.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion diode-server/ingester/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const bufSize = 1024 * 1024
func startReconcilerServer(ctx context.Context, t *testing.T) *reconciler.Server {
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false}))
mockRepository := mocks.NewRepository(t)
mockGraphRepository := mocks.NewGraphRepository(t)
authorizer := authutil.NewContextAuthorizer(logger)
serverInterceptors := []grpc.UnaryServerInterceptor{
authutil.NewUnverifiedJWTInterceptor(logger),
Expand All @@ -86,7 +87,7 @@ func startReconcilerServer(ctx context.Context, t *testing.T) *reconciler.Server
return handler(ctx, req)
},
}
server, err := reconciler.NewServer(ctx, logger, mockRepository, serverInterceptors...)
server, err := reconciler.NewServer(ctx, logger, mockRepository, mockGraphRepository, serverInterceptors...)
require.NoError(t, err)

errChan := make(chan error, 1)
Expand Down
231 changes: 231 additions & 0 deletions diode-server/reconciler/graph_endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package reconciler

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"strconv"
"time"

"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgtype"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/structpb"

"github.com/netboxlabs/diode/diode-server/entityhash"
"github.com/netboxlabs/diode/diode-server/gen/dbstore/postgres"
"github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb"
"github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb"
)

func createEntity(ctx context.Context, graphdb GraphRepository, req *reconcilerpb.CreateEntityRequest) (*reconcilerpb.CreateEntityResponse, error) {
entity := req.GetEntity()
if entity == nil || entity.GetEntity() == nil {
return nil, fmt.Errorf("entity is required")
}

// Extract node type from entity
nodeType := getEntityTypeName(entity)
if nodeType == "" {
return nil, fmt.Errorf("failed to determine entity type")
}

// Generate new UUID for external ID
externalID := uuid.New().String()

Comment on lines +34 to +36
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createEntity always generates a new UUID for externalID, so repeated calls with the same entity will create multiple rows (UpsertGraphNode only conflicts on (node_type, external_id)). This contradicts the RPC comment claiming idempotency and will inflate duplicates. Consider looking up an existing node first (e.g., by content hash/metadata) and reusing its external_id, or otherwise deriving a stable external_id for idempotency.

Copilot uses AI. Check for mistakes.
// Marshal entity data
entityData, err := protojson.Marshal(entity)
if err != nil {
return nil, fmt.Errorf("failed to marshal entity: %w", err)
}

// Convert request metadata to JSON
var metadata json.RawMessage
if req.GetMetadata() != nil {
metadataBytes, err := req.GetMetadata().MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal metadata: %w", err)
}
metadata = metadataBytes
} else {
metadata = json.RawMessage("{}")
}

// Generate content hash for deduplication
fingerprinter := entityhash.NewEntityFingerprinter()
contentHash, err := fingerprinter.GenerateEntityHash(entity)
if err != nil {
return nil, fmt.Errorf("failed to generate entity hash: %w", err)
}

args := postgres.UpsertGraphNodeParams{
ExternalID: externalID,
NodeType: nodeType,
Data: entityData,
MatchingSchemaVersion: CurrentSchemaVersion,
Metadata: metadata,
ContentHash: pgtype.Text{String: contentHash, Valid: true},
}

node, err := graphdb.UpsertGraphNode(ctx, args)
if err != nil {
return nil, err
}

// Build response from upsert result
return &reconcilerpb.CreateEntityResponse{
Id: node.ExternalID,
ObjectType: node.NodeType,
}, nil
}

const (
defaultPageSize = 100
maxPageSize = 1000
)

func listEntities(ctx context.Context, graphdb GraphRepository, req *reconcilerpb.ListEntitiesRequest) (*reconcilerpb.ListEntitiesResponse, error) {
// Parse pagination parameters
pageSize := defaultPageSize
if req.GetPageSize() > 0 {
pageSize = int(req.GetPageSize())
Comment on lines +88 to +92
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New RPC behavior in createEntity/listEntities is added without unit tests. Please add tests using mocks.GraphRepository to cover pagination token parsing (valid/invalid), filter passthrough to ListGraphNodes, next page token generation, and create-entity idempotency/error paths.

Copilot uses AI. Check for mistakes.
if pageSize > maxPageSize {
pageSize = maxPageSize
}
}

// Parse page token (offset encoded as base64)
offset := int32(0)
if req.GetPageToken() != "" {
offsetBytes, err := base64.StdEncoding.DecodeString(req.GetPageToken())
if err != nil {
Comment on lines +98 to +102
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pagination token handling here duplicates logic and uses a different encoding (base64 of an ASCII integer) than the existing deviation pagination helpers (binary int32 + base64 in deviation.go). This inconsistency makes client implementations harder and increases maintenance cost. Consider reusing a shared helper or aligning token formats across endpoints.

Copilot uses AI. Check for mistakes.
return nil, fmt.Errorf("invalid page token: %w", err)
}
offsetVal, err := strconv.ParseInt(string(offsetBytes), 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid page token: %w", err)
}
if offsetVal < 0 {
return nil, fmt.Errorf("invalid page token: negative offset")
}
offset = int32(offsetVal)
}

// Build query parameters
params := postgres.ListGraphNodesParams{
Offset: offset,
Limit: int32(pageSize + 1), // Fetch one extra to determine if there's a next page
}

// Set object type filter
if len(req.GetObjectType()) > 0 {
params.NodeTypes = req.GetObjectType()
}

// Set metadata filter
if req.GetMetadataFilters() != nil {
metadataJSON, err := req.GetMetadataFilters().MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal metadata filters: %w", err)
}
params.MetadataFilter = metadataJSON
}

// Set timestamp range filters
if req.IngestionTsStart != nil {
params.TsStart = pgtype.Timestamptz{
Time: time.Unix(0, req.GetIngestionTsStart()*int64(time.Millisecond)),
Valid: true,
}
}
if req.IngestionTsEnd != nil {
params.TsEnd = pgtype.Timestamptz{
Time: time.Unix(0, req.GetIngestionTsEnd()*int64(time.Millisecond)),
Valid: true,
}
}

// Execute query
nodes, err := graphdb.ListGraphNodes(ctx, params)
if err != nil {
return nil, fmt.Errorf("failed to list entities: %w", err)
}

// Determine if there's a next page
hasNextPage := len(nodes) > pageSize
if hasNextPage {
nodes = nodes[:pageSize] // Trim to requested page size
}

// Convert nodes to DiodeEntity
entities := make([]*reconcilerpb.DiodeEntity, 0, len(nodes))
for _, node := range nodes {
entity, err := graphNodeToDiodeEntity(node)
if err != nil {
return nil, fmt.Errorf("failed to convert node to entity: %w", err)
}
entities = append(entities, entity)
}

// Build response
resp := &reconcilerpb.ListEntitiesResponse{
Entities: entities,
}

// Set next page token if there are more results
if hasNextPage {
nextOffset := offset + int32(pageSize)
resp.NextPageToken = base64.StdEncoding.EncodeToString([]byte(strconv.Itoa(int(nextOffset))))
}

return resp, nil
}

// graphNodeToDiodeEntity converts a GraphNode to a DiodeEntity
func graphNodeToDiodeEntity(node postgres.GraphNode) (*reconcilerpb.DiodeEntity, error) {
// Unmarshal entity data
var entity diodepb.Entity
if err := protojson.Unmarshal(node.Data, &entity); err != nil {
return nil, fmt.Errorf("failed to unmarshal entity data: %w", err)
}

// Convert metadata to structpb.Struct
var sourceMetadata *structpb.Struct
if len(node.Metadata) > 0 {
var metadataMap map[string]interface{}
if err := json.Unmarshal(node.Metadata, &metadataMap); err != nil {
return nil, fmt.Errorf("failed to unmarshal metadata: %w", err)
}
var err error
sourceMetadata, err = structpb.NewStruct(metadataMap)
if err != nil {
return nil, fmt.Errorf("failed to create struct from metadata: %w", err)
}
}

// Convert timestamps to milliseconds
var ingestionTs, createdAt, updatedAt, lastSeenTs int64
if node.LastSeenTs.Valid {
ingestionTs = node.LastSeenTs.Time.UnixMilli()
lastSeenTs = node.LastSeenTs.Time.UnixMilli()
}
if node.CreatedAt.Valid {
createdAt = node.CreatedAt.Time.UnixMilli()
}
if node.UpdatedAt.Valid {
updatedAt = node.UpdatedAt.Time.UnixMilli()
}

return &reconcilerpb.DiodeEntity{
Id: node.ExternalID,
ObjectType: node.NodeType,
IngestionTs: ingestionTs,
Entity: &entity,
SourceMetadata: sourceMetadata,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
LastSeenTs: lastSeenTs,
DuplicateCount: node.DuplicateCount,
}, nil
}
1 change: 1 addition & 0 deletions diode-server/reconciler/graph_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type GraphRepository interface {
GetGraphNodesByType(ctx context.Context, arg postgres.GetGraphNodesByTypeParams) ([]postgres.GraphNode, error)
FindNodeByMetadata(ctx context.Context, arg postgres.FindNodeByMetadataParams) (postgres.GraphNode, error)
FindNodeByContentHash(ctx context.Context, arg postgres.FindNodeByContentHashParams) (postgres.GraphNode, error)
ListGraphNodes(ctx context.Context, arg postgres.ListGraphNodesParams) ([]postgres.GraphNode, error)

// Snapshot management
InsertSnapshot(ctx context.Context, arg postgres.InsertSnapshotParams) (postgres.GraphNodeSnapshot, error)
Expand Down
Loading