Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion diode-server/cmd/protograph/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// CreateEntityFromInterface creates a diodepb.Entity from a typed interface value.
// This replaces the large manual switch statements in the GraphBuilder.
// This replaces the large manual switch statements in the graph Service.
func CreateEntityFromInterface(fieldValue any) *diodepb.Entity {
if fieldValue == nil {
return nil
Expand Down
22 changes: 11 additions & 11 deletions diode-server/cmd/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ func main() {

repository := postgres.NewRepository(dbPool)

// Initialize GraphBuilder if graph DB feature is enabled
var graphBuilderOpt reconciler.ProcessorOption
// Initialize graph Service if graph DB feature is enabled
var graphServiceOpt reconciler.ProcessorOption
if cfg.EnableGraphDB {
s.Logger().Info("graph DB feature enabled, initializing GraphBuilder")
s.Logger().Info("graph DB feature enabled, initializing graph Service")

// Load matching configuration if provided
var matchingConfig *matching.Config
Expand All @@ -159,10 +159,10 @@ func main() {
// Create graph repository adapter
graphRepo := postgres.NewGraphRepository(dbPool)

// Create Builder with graph repository
var builderOpts []graph.BuilderOption
// Create graph Service with repository
var opts []graph.Option
if matchingConfig != nil {
builderOpts = append(builderOpts, graph.WithMatchingConfig(matchingConfig))
opts = append(opts, graph.WithMatchingConfig(matchingConfig))

// Create entity matcher for confidence-based matching
matcherConfig := &matching.EntityMatchingConfig{
Expand All @@ -173,11 +173,11 @@ func main() {
MaxCacheSize: 1000,
}
entityMatcher := entitymatcher.NewMatcher(graphRepo, matcherConfig, s.Logger())
builderOpts = append(builderOpts, graph.WithEntityMatcher(entityMatcher))
opts = append(opts, graph.WithEntityMatcher(entityMatcher))
}
graphBuilder := graph.NewBuilder(graphRepo, s.Logger(), builderOpts...)
graphService := graph.NewService(graphRepo, s.Logger(), opts...)

graphBuilderOpt = reconciler.WithGraphBuilder(graphBuilder)
graphServiceOpt = reconciler.WithGraphService(graphService)
}

diodeToNetBoxMaxRetries := 3
Expand All @@ -203,8 +203,8 @@ func main() {

// Build processor options
var processorOpts []reconciler.ProcessorOption
if graphBuilderOpt != nil {
processorOpts = append(processorOpts, graphBuilderOpt)
if graphServiceOpt != nil {
processorOpts = append(processorOpts, graphServiceOpt)
}

ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), cfg, redisClient, redisStreamClient, reconciler.DefaultRedisStreamID, reconciler.DefaultRedisConsumerGroup, ops, metricRecorder, processorOpts...)
Expand Down
161 changes: 103 additions & 58 deletions diode-server/dbstore/postgres/graph_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,16 @@ func toNode(n postgres.GraphNode) graph.Node {
contentHash = &n.ContentHash.String
}
return graph.Node{
ID: n.ID,
ExternalID: n.ExternalID,
NodeType: n.NodeType,
Data: json.RawMessage(n.Data),
DuplicateCount: n.DuplicateCount,
MatchingSchemaVersion: n.MatchingSchemaVersion,
CreatedAt: n.CreatedAt.Time,
UpdatedAt: n.UpdatedAt.Time,
LastSeenTs: lastSeen,
Metadata: json.RawMessage(n.Metadata),
ContentHash: contentHash,
ID: n.ID,
ExternalID: n.ExternalID,
NodeType: n.NodeType,
Data: json.RawMessage(n.Data),
DuplicateCount: n.DuplicateCount,
CreatedAt: n.CreatedAt.Time,
UpdatedAt: n.UpdatedAt.Time,
LastSeenTs: lastSeen,
Metadata: json.RawMessage(n.Metadata),
ContentHash: contentHash,
}
}

Expand Down Expand Up @@ -97,12 +96,11 @@ func wrapNotFound(err error) error {
// UpsertNode implements graph.Repository.
func (r *GraphRepository) UpsertNode(ctx context.Context, arg graph.UpsertNodeParams) (graph.Node, error) {
result, err := r.queries.UpsertGraphNode(ctx, postgres.UpsertGraphNodeParams{
ExternalID: arg.ExternalID,
NodeType: arg.NodeType,
Data: []byte(arg.Data),
MatchingSchemaVersion: arg.MatchingSchemaVersion,
Metadata: []byte(arg.Metadata),
ContentHash: toOptionalPgText(arg.ContentHash),
ExternalID: arg.ExternalID,
NodeType: arg.NodeType,
Data: []byte(arg.Data),
Metadata: []byte(arg.Metadata),
ContentHash: toOptionalPgText(arg.ContentHash),
})
if err != nil {
return graph.Node{}, err
Expand All @@ -113,12 +111,11 @@ func (r *GraphRepository) UpsertNode(ctx context.Context, arg graph.UpsertNodePa
// UpdateNodeData implements graph.Repository.
func (r *GraphRepository) UpdateNodeData(ctx context.Context, arg graph.UpdateNodeDataParams) (graph.Node, error) {
result, err := r.queries.UpdateGraphNodeData(ctx, postgres.UpdateGraphNodeDataParams{
NodeType: arg.NodeType,
ExternalID: arg.ExternalID,
Data: []byte(arg.Data),
MatchingSchemaVersion: arg.MatchingSchemaVersion,
Metadata: []byte(arg.Metadata),
ContentHash: toOptionalPgText(arg.ContentHash),
NodeType: arg.NodeType,
ExternalID: arg.ExternalID,
Data: []byte(arg.Data),
Metadata: []byte(arg.Metadata),
ContentHash: toOptionalPgText(arg.ContentHash),
})
if err != nil {
return graph.Node{}, wrapNotFound(err)
Expand Down Expand Up @@ -231,6 +228,47 @@ func (r *GraphRepository) CleanupOldSnapshots(ctx context.Context, arg graph.Cle
})
}

// nodeWithSnapshotFields is a common shape shared by GetNodeWithLatestSnapshotRow and ListNodesRow.
type nodeWithSnapshotFields struct {
ID int64
ExternalID string
NodeType string
MatchingData []byte
DuplicateCount int32
LastSeenTs pgtype.Timestamptz
CreatedAt pgtype.Timestamptz
UpdatedAt pgtype.Timestamptz
Metadata []byte
SnapshotData []byte
SequenceNumber int32
SnapshotCreatedAt pgtype.Timestamptz
}

func toNodeWithLatestSnapshot(r nodeWithSnapshotFields) graph.NodeWithLatestSnapshot {
var lastSeen *time.Time
if r.LastSeenTs.Valid {
lastSeen = &r.LastSeenTs.Time
}
var snapshotCreatedAt *time.Time
if r.SnapshotCreatedAt.Valid {
snapshotCreatedAt = &r.SnapshotCreatedAt.Time
}
return graph.NodeWithLatestSnapshot{
ID: r.ID,
ExternalID: r.ExternalID,
NodeType: r.NodeType,
MatchingData: json.RawMessage(r.MatchingData),
DuplicateCount: r.DuplicateCount,
LastSeenTs: lastSeen,
CreatedAt: r.CreatedAt.Time,
UpdatedAt: r.UpdatedAt.Time,
Metadata: json.RawMessage(r.Metadata),
SnapshotData: json.RawMessage(r.SnapshotData),
SequenceNumber: r.SequenceNumber,
SnapshotCreatedAt: snapshotCreatedAt,
}
}

// GetNodeWithLatestSnapshot implements graph.Repository.
func (r *GraphRepository) GetNodeWithLatestSnapshot(ctx context.Context, arg graph.GetNodeWithLatestSnapshotParams) (graph.NodeWithLatestSnapshot, error) {
result, err := r.queries.GetNodeWithLatestSnapshot(ctx, postgres.GetNodeWithLatestSnapshotParams{
Expand All @@ -240,31 +278,51 @@ func (r *GraphRepository) GetNodeWithLatestSnapshot(ctx context.Context, arg gra
if err != nil {
return graph.NodeWithLatestSnapshot{}, wrapNotFound(err)
}
return toNodeWithLatestSnapshot(nodeWithSnapshotFields{
ID: result.ID,
ExternalID: result.ExternalID,
NodeType: result.NodeType,
MatchingData: result.MatchingData,
DuplicateCount: result.DuplicateCount,
LastSeenTs: result.LastSeenTs,
CreatedAt: result.CreatedAt,
UpdatedAt: result.UpdatedAt,
Metadata: result.Metadata,
SnapshotData: result.SnapshotData,
SequenceNumber: result.SequenceNumber,
SnapshotCreatedAt: result.SnapshotCreatedAt,
}), nil
}

var lastSeen *time.Time
if result.LastSeenTs.Valid {
lastSeen = &result.LastSeenTs.Time
// ListNodes implements graph.Repository.
func (r *GraphRepository) ListNodes(ctx context.Context, arg graph.ListNodesParams) ([]graph.NodeWithLatestSnapshot, error) {
rows, err := r.queries.ListNodes(ctx, postgres.ListNodesParams{
NodeTypes: arg.NodeTypes,
MetadataFilter: []byte(arg.MetadataFilter),
Limit: arg.Limit,
Offset: arg.Offset,
})
if err != nil {
return nil, err
}
var snapshotCreatedAt *time.Time
if result.SnapshotCreatedAt.Valid {
snapshotCreatedAt = &result.SnapshotCreatedAt.Time
result := make([]graph.NodeWithLatestSnapshot, len(rows))
for i, row := range rows {
result[i] = toNodeWithLatestSnapshot(nodeWithSnapshotFields{
ID: row.ID,
ExternalID: row.ExternalID,
NodeType: row.NodeType,
MatchingData: row.MatchingData,
DuplicateCount: row.DuplicateCount,
LastSeenTs: row.LastSeenTs,
CreatedAt: row.CreatedAt,
UpdatedAt: row.UpdatedAt,
Metadata: row.Metadata,
SnapshotData: row.SnapshotData,
SequenceNumber: row.SequenceNumber,
SnapshotCreatedAt: row.SnapshotCreatedAt,
})
}

return graph.NodeWithLatestSnapshot{
ID: result.ID,
ExternalID: result.ExternalID,
NodeType: result.NodeType,
MatchingData: json.RawMessage(result.MatchingData),
DuplicateCount: result.DuplicateCount,
MatchingSchemaVersion: result.MatchingSchemaVersion,
LastSeenTs: lastSeen,
CreatedAt: result.CreatedAt.Time,
UpdatedAt: result.UpdatedAt.Time,
Metadata: json.RawMessage(result.Metadata),
SnapshotData: json.RawMessage(result.SnapshotData),
SequenceNumber: result.SequenceNumber,
SnapshotCreatedAt: snapshotCreatedAt,
}, nil
return result, nil
}

// GetSnapshotsByNode implements graph.Repository.
Expand All @@ -279,16 +337,3 @@ func (r *GraphRepository) GetSnapshotsByNode(ctx context.Context, arg graph.GetS
}
return toSnapshots(result), nil
}

// FindNodesNeedingSchemaUpdate implements graph.Repository.
func (r *GraphRepository) FindNodesNeedingSchemaUpdate(ctx context.Context, arg graph.FindNodesNeedingSchemaUpdateParams) ([]graph.Node, error) {
result, err := r.queries.FindNodesNeedingSchemaUpdate(ctx, postgres.FindNodesNeedingSchemaUpdateParams{
MatchingSchemaVersion: arg.MatchingSchemaVersion,
Offset: arg.Offset,
Limit: arg.Limit,
})
if err != nil {
return nil, err
}
return toNodes(result), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ CREATE TABLE IF NOT EXISTS graph_nodes (
node_type TEXT NOT NULL,
data JSONB NOT NULL,
duplicate_count INTEGER DEFAULT 1 NOT NULL,
matching_schema_version INTEGER DEFAULT 1 NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,

Expand Down Expand Up @@ -46,8 +45,6 @@ CREATE INDEX IF NOT EXISTS idx_graph_nodes_type ON graph_nodes (node_type);
CREATE INDEX IF NOT EXISTS idx_graph_nodes_duplicate_count ON graph_nodes (duplicate_count);
CREATE INDEX IF NOT EXISTS idx_graph_nodes_data_gin ON graph_nodes USING GIN (data);
CREATE INDEX IF NOT EXISTS idx_graph_nodes_updated_at ON graph_nodes (updated_at);
CREATE INDEX IF NOT EXISTS idx_graph_nodes_schema_version ON graph_nodes (matching_schema_version);

-- Standard indexes for commonly searched fields (fuzzy matching now handled in application layer)
CREATE INDEX IF NOT EXISTS idx_graph_nodes_data_name ON graph_nodes ((data->>'name'));
CREATE INDEX IF NOT EXISTS idx_graph_nodes_data_serial ON graph_nodes ((data->>'serial'));
Expand All @@ -72,7 +69,6 @@ CREATE TRIGGER update_graph_nodes_updated_at
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

-- Add comments for documentation
COMMENT ON COLUMN graph_nodes.matching_schema_version IS 'Version of the matching configuration used to extract matching attributes';
COMMENT ON COLUMN graph_node_snapshots.snapshot_data IS 'Complete entity protojson snapshot for historical tracking';
COMMENT ON COLUMN graph_node_snapshots.sequence_number IS 'Sequential number for ordering snapshots, higher is newer';
COMMENT ON COLUMN graph_edges.confidence_score IS 'Confidence score (0.0-1.0) for the relationship match';
Expand All @@ -83,7 +79,6 @@ COMMENT ON COLUMN graph_edges.edge_subtype IS 'Subtype of edge based on confiden
-- +goose Down

-- Remove comments
COMMENT ON COLUMN graph_nodes.matching_schema_version IS NULL;
COMMENT ON COLUMN graph_node_snapshots.snapshot_data IS NULL;
COMMENT ON COLUMN graph_node_snapshots.sequence_number IS NULL;
COMMENT ON COLUMN graph_edges.confidence_score IS NULL;
Expand All @@ -109,7 +104,6 @@ DROP INDEX IF EXISTS idx_graph_node_snapshots_node_id;

DROP INDEX IF EXISTS idx_graph_nodes_data_serial;
DROP INDEX IF EXISTS idx_graph_nodes_data_name;
DROP INDEX IF EXISTS idx_graph_nodes_schema_version;
DROP INDEX IF EXISTS idx_graph_nodes_updated_at;
DROP INDEX IF EXISTS idx_graph_nodes_data_gin;
DROP INDEX IF EXISTS idx_graph_nodes_duplicate_count;
Expand Down
Loading
Loading