diff --git a/diode-server/cmd/reconciler/main.go b/diode-server/cmd/reconciler/main.go index bd1ae6da..0c532b50 100644 --- a/diode-server/cmd/reconciler/main.go +++ b/diode-server/cmd/reconciler/main.go @@ -134,6 +134,8 @@ func main() { } defer dbPool.Close() + var graphRepo reconciler.GraphRepository + repository := postgres.NewRepository(dbPool) // Initialize GraphBuilder if graph DB feature is enabled @@ -158,6 +160,7 @@ func main() { // Use SQLC-generated queries directly for graph operations graphQueries := dbpostgres.New(dbPool) + graphRepo = graphQueries // Create GraphBuilder with graph queries graphBuilder := reconciler.NewGraphBuilder(graphQueries, s.Logger()) @@ -222,7 +225,7 @@ func main() { } authorizer := authutil.NewContextAuthorizer(s.Logger()) - gRPCServer, err := reconciler.NewServer(ctx, s.Logger(), repository, serverInterceptors(authorizer, s.Logger())...) + gRPCServer, err := reconciler.NewServer(ctx, s.Logger(), repository, graphRepo, serverInterceptors(authorizer, s.Logger())...) if err != nil { s.Logger().Error("failed to instantiate gRPC server", "error", err) metricRecorder.RecordServiceStartupAttempt(ctx, false) diff --git a/diode-server/dbstore/postgres/queries/graph.sql b/diode-server/dbstore/postgres/queries/graph.sql index b15f6c77..dd7fabf0 100644 --- a/diode-server/dbstore/postgres/queries/graph.sql +++ b/diode-server/dbstore/postgres/queries/graph.sql @@ -266,4 +266,23 @@ LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset'); UPDATE graph_nodes SET metadata = graph_nodes.metadata || $3, updated_at = NOW() WHERE node_type = $1 AND external_id = $2 -RETURNING id, external_id, node_type, data, duplicate_count, matching_schema_version, created_at, updated_at, last_seen_ts, metadata, content_hash; \ No newline at end of file +RETURNING id, external_id, node_type, data, duplicate_count, matching_schema_version, created_at, updated_at, last_seen_ts, metadata, content_hash; + +-- name: ListGraphNodes :many +-- Lists graph nodes with optional filtering by types, metadata, and timestamp range. +-- All filters are optional - pass NULL/empty to skip filtering. +-- node_types: pass NULL or empty array to list all types +-- metadata_filter: pass NULL to skip metadata filtering (uses GIN index when provided) +-- ts_start/ts_end: pass NULL to skip timestamp range filtering +SELECT id, external_id, node_type, data, duplicate_count, matching_schema_version, created_at, updated_at, last_seen_ts, metadata, content_hash +FROM graph_nodes +WHERE + -- Filter by node types (optional) + (sqlc.narg('node_types')::text[] IS NULL OR array_length(sqlc.narg('node_types')::text[], 1) IS NULL OR node_type = ANY(sqlc.narg('node_types')::text[])) + -- Filter by metadata (optional) + AND (sqlc.narg('metadata_filter')::jsonb IS NULL OR metadata @> sqlc.narg('metadata_filter')::jsonb) + -- Filter by timestamp range (optional) + AND (sqlc.narg('ts_start')::timestamptz IS NULL OR last_seen_ts >= sqlc.narg('ts_start')::timestamptz) + AND (sqlc.narg('ts_end')::timestamptz IS NULL OR last_seen_ts <= sqlc.narg('ts_end')::timestamptz) +ORDER BY last_seen_ts DESC, id +LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset'); \ No newline at end of file diff --git a/diode-server/gen/dbstore/postgres/graph.sql.go b/diode-server/gen/dbstore/postgres/graph.sql.go index 4ef15c33..96e6a982 100644 --- a/diode-server/gen/dbstore/postgres/graph.sql.go +++ b/diode-server/gen/dbstore/postgres/graph.sql.go @@ -886,6 +886,74 @@ func (q *Queries) InsertSnapshot(ctx context.Context, arg InsertSnapshotParams) return i, err } +const listGraphNodes = `-- name: ListGraphNodes :many +SELECT id, external_id, node_type, data, duplicate_count, matching_schema_version, created_at, updated_at, last_seen_ts, metadata, content_hash +FROM graph_nodes +WHERE + -- Filter by node types (optional) + ($1::text[] IS NULL OR array_length($1::text[], 1) IS NULL OR node_type = ANY($1::text[])) + -- Filter by metadata (optional) + AND ($2::jsonb IS NULL OR metadata @> $2::jsonb) + -- Filter by timestamp range (optional) + AND ($3::timestamptz IS NULL OR last_seen_ts >= $3::timestamptz) + AND ($4::timestamptz IS NULL OR last_seen_ts <= $4::timestamptz) +ORDER BY last_seen_ts DESC, id +LIMIT $6 OFFSET $5 +` + +type ListGraphNodesParams struct { + NodeTypes []string `json:"node_types"` + MetadataFilter []byte `json:"metadata_filter"` + TsStart pgtype.Timestamptz `json:"ts_start"` + TsEnd pgtype.Timestamptz `json:"ts_end"` + Offset int32 `json:"offset"` + Limit int32 `json:"limit"` +} + +// Lists graph nodes with optional filtering by types, metadata, and timestamp range. +// All filters are optional - pass NULL/empty to skip filtering. +// node_types: pass NULL or empty array to list all types +// metadata_filter: pass NULL to skip metadata filtering (uses GIN index when provided) +// ts_start/ts_end: pass NULL to skip timestamp range filtering +func (q *Queries) ListGraphNodes(ctx context.Context, arg ListGraphNodesParams) ([]GraphNode, error) { + rows, err := q.db.Query(ctx, listGraphNodes, + arg.NodeTypes, + arg.MetadataFilter, + arg.TsStart, + arg.TsEnd, + arg.Offset, + arg.Limit, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GraphNode + for rows.Next() { + var i GraphNode + if err := rows.Scan( + &i.ID, + &i.ExternalID, + &i.NodeType, + &i.Data, + &i.DuplicateCount, + &i.MatchingSchemaVersion, + &i.CreatedAt, + &i.UpdatedAt, + &i.LastSeenTs, + &i.Metadata, + &i.ContentHash, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const searchGraphNodes = `-- name: SearchGraphNodes :many SELECT id, external_id, node_type, data, duplicate_count, matching_schema_version, created_at, updated_at, last_seen_ts, metadata, content_hash FROM graph_nodes diff --git a/diode-server/ingester/component_test.go b/diode-server/ingester/component_test.go index 3cdc4963..b80aee37 100644 --- a/diode-server/ingester/component_test.go +++ b/diode-server/ingester/component_test.go @@ -76,6 +76,7 @@ const bufSize = 1024 * 1024 func startReconcilerServer(ctx context.Context, t *testing.T) *reconciler.Server { logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) mockRepository := mocks.NewRepository(t) + mockGraphRepository := mocks.NewGraphRepository(t) authorizer := authutil.NewContextAuthorizer(logger) serverInterceptors := []grpc.UnaryServerInterceptor{ authutil.NewUnverifiedJWTInterceptor(logger), @@ -86,7 +87,7 @@ func startReconcilerServer(ctx context.Context, t *testing.T) *reconciler.Server return handler(ctx, req) }, } - server, err := reconciler.NewServer(ctx, logger, mockRepository, serverInterceptors...) + server, err := reconciler.NewServer(ctx, logger, mockRepository, mockGraphRepository, serverInterceptors...) require.NoError(t, err) errChan := make(chan error, 1) diff --git a/diode-server/reconciler/graph_endpoints.go b/diode-server/reconciler/graph_endpoints.go new file mode 100644 index 00000000..6dd11e50 --- /dev/null +++ b/diode-server/reconciler/graph_endpoints.go @@ -0,0 +1,231 @@ +package reconciler + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgtype" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/types/known/structpb" + + "github.com/netboxlabs/diode/diode-server/entityhash" + "github.com/netboxlabs/diode/diode-server/gen/dbstore/postgres" + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" +) + +func createEntity(ctx context.Context, graphdb GraphRepository, req *reconcilerpb.CreateEntityRequest) (*reconcilerpb.CreateEntityResponse, error) { + entity := req.GetEntity() + if entity == nil || entity.GetEntity() == nil { + return nil, fmt.Errorf("entity is required") + } + + // Extract node type from entity + nodeType := getEntityTypeName(entity) + if nodeType == "" { + return nil, fmt.Errorf("failed to determine entity type") + } + + // Generate new UUID for external ID + externalID := uuid.New().String() + + // Marshal entity data + entityData, err := protojson.Marshal(entity) + if err != nil { + return nil, fmt.Errorf("failed to marshal entity: %w", err) + } + + // Convert request metadata to JSON + var metadata json.RawMessage + if req.GetMetadata() != nil { + metadataBytes, err := req.GetMetadata().MarshalJSON() + if err != nil { + return nil, fmt.Errorf("failed to marshal metadata: %w", err) + } + metadata = metadataBytes + } else { + metadata = json.RawMessage("{}") + } + + // Generate content hash for deduplication + fingerprinter := entityhash.NewEntityFingerprinter() + contentHash, err := fingerprinter.GenerateEntityHash(entity) + if err != nil { + return nil, fmt.Errorf("failed to generate entity hash: %w", err) + } + + args := postgres.UpsertGraphNodeParams{ + ExternalID: externalID, + NodeType: nodeType, + Data: entityData, + MatchingSchemaVersion: CurrentSchemaVersion, + Metadata: metadata, + ContentHash: pgtype.Text{String: contentHash, Valid: true}, + } + + node, err := graphdb.UpsertGraphNode(ctx, args) + if err != nil { + return nil, err + } + + // Build response from upsert result + return &reconcilerpb.CreateEntityResponse{ + Id: node.ExternalID, + ObjectType: node.NodeType, + }, nil +} + +const ( + defaultPageSize = 100 + maxPageSize = 1000 +) + +func listEntities(ctx context.Context, graphdb GraphRepository, req *reconcilerpb.ListEntitiesRequest) (*reconcilerpb.ListEntitiesResponse, error) { + // Parse pagination parameters + pageSize := defaultPageSize + if req.GetPageSize() > 0 { + pageSize = int(req.GetPageSize()) + if pageSize > maxPageSize { + pageSize = maxPageSize + } + } + + // Parse page token (offset encoded as base64) + offset := int32(0) + if req.GetPageToken() != "" { + offsetBytes, err := base64.StdEncoding.DecodeString(req.GetPageToken()) + if err != nil { + return nil, fmt.Errorf("invalid page token: %w", err) + } + offsetVal, err := strconv.ParseInt(string(offsetBytes), 10, 32) + if err != nil { + return nil, fmt.Errorf("invalid page token: %w", err) + } + if offsetVal < 0 { + return nil, fmt.Errorf("invalid page token: negative offset") + } + offset = int32(offsetVal) + } + + // Build query parameters + params := postgres.ListGraphNodesParams{ + Offset: offset, + Limit: int32(pageSize + 1), // Fetch one extra to determine if there's a next page + } + + // Set object type filter + if len(req.GetObjectType()) > 0 { + params.NodeTypes = req.GetObjectType() + } + + // Set metadata filter + if req.GetMetadataFilters() != nil { + metadataJSON, err := req.GetMetadataFilters().MarshalJSON() + if err != nil { + return nil, fmt.Errorf("failed to marshal metadata filters: %w", err) + } + params.MetadataFilter = metadataJSON + } + + // Set timestamp range filters + if req.IngestionTsStart != nil { + params.TsStart = pgtype.Timestamptz{ + Time: time.Unix(0, req.GetIngestionTsStart()*int64(time.Millisecond)), + Valid: true, + } + } + if req.IngestionTsEnd != nil { + params.TsEnd = pgtype.Timestamptz{ + Time: time.Unix(0, req.GetIngestionTsEnd()*int64(time.Millisecond)), + Valid: true, + } + } + + // Execute query + nodes, err := graphdb.ListGraphNodes(ctx, params) + if err != nil { + return nil, fmt.Errorf("failed to list entities: %w", err) + } + + // Determine if there's a next page + hasNextPage := len(nodes) > pageSize + if hasNextPage { + nodes = nodes[:pageSize] // Trim to requested page size + } + + // Convert nodes to DiodeEntity + entities := make([]*reconcilerpb.DiodeEntity, 0, len(nodes)) + for _, node := range nodes { + entity, err := graphNodeToDiodeEntity(node) + if err != nil { + return nil, fmt.Errorf("failed to convert node to entity: %w", err) + } + entities = append(entities, entity) + } + + // Build response + resp := &reconcilerpb.ListEntitiesResponse{ + Entities: entities, + } + + // Set next page token if there are more results + if hasNextPage { + nextOffset := offset + int32(pageSize) + resp.NextPageToken = base64.StdEncoding.EncodeToString([]byte(strconv.Itoa(int(nextOffset)))) + } + + return resp, nil +} + +// graphNodeToDiodeEntity converts a GraphNode to a DiodeEntity +func graphNodeToDiodeEntity(node postgres.GraphNode) (*reconcilerpb.DiodeEntity, error) { + // Unmarshal entity data + var entity diodepb.Entity + if err := protojson.Unmarshal(node.Data, &entity); err != nil { + return nil, fmt.Errorf("failed to unmarshal entity data: %w", err) + } + + // Convert metadata to structpb.Struct + var sourceMetadata *structpb.Struct + if len(node.Metadata) > 0 { + var metadataMap map[string]interface{} + if err := json.Unmarshal(node.Metadata, &metadataMap); err != nil { + return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) + } + var err error + sourceMetadata, err = structpb.NewStruct(metadataMap) + if err != nil { + return nil, fmt.Errorf("failed to create struct from metadata: %w", err) + } + } + + // Convert timestamps to milliseconds + var ingestionTs, createdAt, updatedAt, lastSeenTs int64 + if node.LastSeenTs.Valid { + ingestionTs = node.LastSeenTs.Time.UnixMilli() + lastSeenTs = node.LastSeenTs.Time.UnixMilli() + } + if node.CreatedAt.Valid { + createdAt = node.CreatedAt.Time.UnixMilli() + } + if node.UpdatedAt.Valid { + updatedAt = node.UpdatedAt.Time.UnixMilli() + } + + return &reconcilerpb.DiodeEntity{ + Id: node.ExternalID, + ObjectType: node.NodeType, + IngestionTs: ingestionTs, + Entity: &entity, + SourceMetadata: sourceMetadata, + CreatedAt: createdAt, + UpdatedAt: updatedAt, + LastSeenTs: lastSeenTs, + DuplicateCount: node.DuplicateCount, + }, nil +} diff --git a/diode-server/reconciler/graph_repository.go b/diode-server/reconciler/graph_repository.go index d103f925..0a18be57 100644 --- a/diode-server/reconciler/graph_repository.go +++ b/diode-server/reconciler/graph_repository.go @@ -37,6 +37,7 @@ type GraphRepository interface { GetGraphNodesByType(ctx context.Context, arg postgres.GetGraphNodesByTypeParams) ([]postgres.GraphNode, error) FindNodeByMetadata(ctx context.Context, arg postgres.FindNodeByMetadataParams) (postgres.GraphNode, error) FindNodeByContentHash(ctx context.Context, arg postgres.FindNodeByContentHashParams) (postgres.GraphNode, error) + ListGraphNodes(ctx context.Context, arg postgres.ListGraphNodesParams) ([]postgres.GraphNode, error) // Snapshot management InsertSnapshot(ctx context.Context, arg postgres.InsertSnapshotParams) (postgres.GraphNodeSnapshot, error) diff --git a/diode-server/reconciler/mocks/graphrepository.go b/diode-server/reconciler/mocks/graphrepository.go index cb93d4ed..db363185 100644 --- a/diode-server/reconciler/mocks/graphrepository.go +++ b/diode-server/reconciler/mocks/graphrepository.go @@ -706,6 +706,65 @@ func (_c *GraphRepository_InsertSnapshot_Call) RunAndReturn(run func(context.Con return _c } +// ListGraphNodes provides a mock function with given fields: ctx, arg +func (_m *GraphRepository) ListGraphNodes(ctx context.Context, arg postgres.ListGraphNodesParams) ([]postgres.GraphNode, error) { + ret := _m.Called(ctx, arg) + + if len(ret) == 0 { + panic("no return value specified for ListGraphNodes") + } + + var r0 []postgres.GraphNode + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, postgres.ListGraphNodesParams) ([]postgres.GraphNode, error)); ok { + return rf(ctx, arg) + } + if rf, ok := ret.Get(0).(func(context.Context, postgres.ListGraphNodesParams) []postgres.GraphNode); ok { + r0 = rf(ctx, arg) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]postgres.GraphNode) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, postgres.ListGraphNodesParams) error); ok { + r1 = rf(ctx, arg) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GraphRepository_ListGraphNodes_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListGraphNodes' +type GraphRepository_ListGraphNodes_Call struct { + *mock.Call +} + +// ListGraphNodes is a helper method to define mock.On call +// - ctx context.Context +// - arg postgres.ListGraphNodesParams +func (_e *GraphRepository_Expecter) ListGraphNodes(ctx interface{}, arg interface{}) *GraphRepository_ListGraphNodes_Call { + return &GraphRepository_ListGraphNodes_Call{Call: _e.mock.On("ListGraphNodes", ctx, arg)} +} + +func (_c *GraphRepository_ListGraphNodes_Call) Run(run func(ctx context.Context, arg postgres.ListGraphNodesParams)) *GraphRepository_ListGraphNodes_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(postgres.ListGraphNodesParams)) + }) + return _c +} + +func (_c *GraphRepository_ListGraphNodes_Call) Return(_a0 []postgres.GraphNode, _a1 error) *GraphRepository_ListGraphNodes_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *GraphRepository_ListGraphNodes_Call) RunAndReturn(run func(context.Context, postgres.ListGraphNodesParams) ([]postgres.GraphNode, error)) *GraphRepository_ListGraphNodes_Call { + _c.Call.Return(run) + return _c +} + // UpdateGraphNodeData provides a mock function with given fields: ctx, arg func (_m *GraphRepository) UpdateGraphNodeData(ctx context.Context, arg postgres.UpdateGraphNodeDataParams) (postgres.GraphNode, error) { ret := _m.Called(ctx, arg) diff --git a/diode-server/reconciler/server.go b/diode-server/reconciler/server.go index a42b07fc..1ff44911 100644 --- a/diode-server/reconciler/server.go +++ b/diode-server/reconciler/server.go @@ -27,10 +27,11 @@ type Server struct { grpcServer *grpc.Server redisClient RedisClient repository Repository + graphdb GraphRepository } // NewServer creates a new reconciler server -func NewServer(ctx context.Context, logger *slog.Logger, repository Repository, serverInterceptors ...grpc.UnaryServerInterceptor) (*Server, error) { +func NewServer(ctx context.Context, logger *slog.Logger, repository Repository, graphdb GraphRepository, serverInterceptors ...grpc.UnaryServerInterceptor) (*Server, error) { var cfg Config envconfig.MustProcess("", &cfg) @@ -71,6 +72,7 @@ func NewServer(ctx context.Context, logger *slog.Logger, repository Repository, grpcServer: grpcServer, redisClient: redisClient, repository: repository, + graphdb: graphdb, } reconcilerpb.RegisterReconcilerServiceServer(grpcServer, component) @@ -118,11 +120,17 @@ func (s *Server) RetrieveDeviationByID(ctx context.Context, req *reconcilerpb.Re } // ListEntities lists observed entities with filtering -func (s *Server) ListEntities(_ context.Context, _ *reconcilerpb.ListEntitiesRequest) (*reconcilerpb.ListEntitiesResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method ListEntities not implemented") +func (s *Server) ListEntities(ctx context.Context, req *reconcilerpb.ListEntitiesRequest) (*reconcilerpb.ListEntitiesResponse, error) { + if s.graphdb == nil { + return nil, status.Errorf(codes.Unavailable, "graph database not available") + } + return listEntities(ctx, s.graphdb, req) } // CreateEntity creates an entity synchronously in the graph database (idempotent - returns existing ID if entity already exists) -func (s *Server) CreateEntity(_ context.Context, _ *reconcilerpb.CreateEntityRequest) (*reconcilerpb.CreateEntityResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method CreateEntity not implemented") +func (s *Server) CreateEntity(ctx context.Context, req *reconcilerpb.CreateEntityRequest) (*reconcilerpb.CreateEntityResponse, error) { + if s.graphdb == nil { + return nil, status.Errorf(codes.Unavailable, "graph database not available") + } + return createEntity(ctx, s.graphdb, req) } diff --git a/diode-server/reconciler/server_test.go b/diode-server/reconciler/server_test.go index e07693c1..2f1aae29 100644 --- a/diode-server/reconciler/server_test.go +++ b/diode-server/reconciler/server_test.go @@ -26,6 +26,7 @@ func TestNewServer(t *testing.T) { logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) mockRepository := mocks.NewRepository(t) + mockGraphRepository := mocks.NewGraphRepository(t) authorizer := authutil.NewContextAuthorizer(logger) serverInterceptors := []grpc.UnaryServerInterceptor{ authutil.NewUnverifiedJWTInterceptor(logger), @@ -36,7 +37,7 @@ func TestNewServer(t *testing.T) { return handler(ctx, req) }, } - server, err := reconciler.NewServer(ctx, logger, mockRepository, serverInterceptors...) + server, err := reconciler.NewServer(ctx, logger, mockRepository, mockGraphRepository, serverInterceptors...) require.NoError(t, err) require.NotNil(t, server)