Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fix the issue related to services.
- Fix the snowflake plugin
- Fix the export method
- Recursive component deletion overeagerly skipping components

## [0.5.0](https://github.com/superduper-io/superduper/compare/0.5.0...0.4.0]) (2024-Nov-02)

Expand Down
8 changes: 7 additions & 1 deletion superduper/backends/base/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,13 @@ def drop_component(self, component: str, identifier: str):
tool_id = self.uuid_tool_mapping[uuid]
tool_ids.append(tool_id)
del self.uuid_tool_mapping[uuid]
self.tool_uuid_mapping[tool_id].remove(uuid)
try:
self.tool_uuid_mapping[tool_id].remove(uuid)
except KeyError:
logging.warn(
f"KeyError: {tool_id} -> {uuid} not found in tool_uuid_mapping"
)
continue
if not self.tool_uuid_mapping[tool_id]:
self.tools[tool_id].drop()
del self.tools[tool_id]
Expand Down
6 changes: 2 additions & 4 deletions superduper/backends/local/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ def list_uuids(self):
return list(self._trigger_uuid_mapping.values())

def put_component(self, component: str, uuid: str):
self.triggers.add((component, uuid))
pass

def drop_component(self, component, identifier):
c = self.db.load(component=component, identifier=identifier)
if isinstance(c, CDC):
self.triggers.remove(c.cdc_table)
pass

def initialize(self):
"""Initialize the CDC."""
Expand Down
95 changes: 70 additions & 25 deletions superduper/base/datalayer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import json
import time
import typing as t
from collections import namedtuple
from collections import defaultdict, namedtuple

import click
import networkx as nx

import superduper as s
from superduper import CFG, logging
Expand Down Expand Up @@ -368,6 +368,53 @@ def apply(
)
return result

def _filter_deletions_by_cascade(self, events: t.List[Delete]):
"""
Filter deletions by cascade.

:param events: List of events to filter.
"""
all_huuids = set(e.huuid for e in events)
lookup = {e.huuid: e for e in events}

graph = nx.DiGraph()

for e in events:
graph.add_node(e.huuid)
for dep in e.parents:
if dep in all_huuids:
graph.add_edge(dep, e.huuid)

# sort events by graph
sorted_nodes = nx.topological_sort(graph)
events = [lookup[n] for n in sorted_nodes if n in all_huuids]

conflicting_events = [
e.huuid for e in events if not set(e.parents).issubset(all_huuids)
]
potential_breakages = defaultdict(list)
if conflicting_events:
for e in conflicting_events:
potential_breakages[e].extend(
[x for x in lookup[e].parents if x not in all_huuids]
)

all_conflicts = conflicting_events[:]

if conflicting_events:
for e in conflicting_events:
# find descendants downstream from the event
downstream = list(nx.descendants(graph, e))
all_conflicts.extend(downstream)

all_conflicts = set(all_conflicts)

events = [e for e in events if e.huuid not in all_conflicts]

non_table_events = [e for e in events if e.component != 'Table']
table_events = [e for e in events if e.component == 'Table']
return non_table_events + table_events, potential_breakages

def remove(
self,
component: str,
Expand All @@ -385,28 +432,35 @@ def remove(
:param force: Toggle to force remove the component.
"""
events: t.List[Delete] = []
failed: t.List[str] = []
self._build_remove(
component=component,
identifier=identifier,
events=events,
failed=failed,
recursive=recursive,
)

if failed and not force:
events = list({e.huuid: e for e in events}.values()) # remove duplicates

filtered_events, potential_breakages = self._filter_deletions_by_cascade(events)

if potential_breakages and not force:
msg = '\n' + '\n'.join(
' ' + f'{k} -> {v}' for k, v in potential_breakages.items()
)
raise exceptions.Conflict(
component, identifier, f"the following components are in use: {failed}"
component,
identifier,
f"the following components are using some components scheduler for deletion: {msg}",
)

for i, e in enumerate(events):
for i, e in enumerate(filtered_events):
logging.info(
f'Removing component [{i + 1}/{len(events)}] '
f'Removing component [{i + 1}/{len(filtered_events)}] '
f'{e.component}:{e.identifier}'
)
e.execute(self)
logging.info(
f'Removing component [{i + 1}/{len(events)}] '
f'Removing component [{i + 1}/{len(filtered_events)}] '
f'{e.component}:{e.identifier}... DONE'
)

Expand All @@ -415,30 +469,22 @@ def _build_remove(
component: str,
identifier: str,
events: t.List,
failed: t.List,
recursive: bool = False,
):

object = self.load(component=component, identifier=identifier)

previous = [e.huuid for e in events]

parents = self.metadata.get_component_parents(
component=component, identifier=identifier
)
fail = False
if parents:
# Only fail the deletion attempt if the parents aren't in this cascade
for p in parents:
if f'{p[0]}:{p[1]}' not in previous:
failed.append(f'{component}:{identifier} -> {p[0]}:{p[1]}')
fail = True

# If the deletion fails, we need to stop
if fail:
return

events.append(Delete(component=component, identifier=identifier))
events.append(
Delete(
component=component,
identifier=identifier,
parents=[':'.join(p) for p in parents],
)
)

if recursive:
children = object.get_children()
Expand All @@ -448,7 +494,6 @@ def _build_remove(
c.identifier,
recursive=True,
events=events,
failed=failed,
)

def load_all(self, component: str, **kwargs) -> t.List[Component]:
Expand Down
27 changes: 17 additions & 10 deletions superduper/base/event.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dataclasses as dc
import math
import typing as t
from abc import abstractmethod
Expand Down Expand Up @@ -370,12 +371,14 @@ class Delete(Event):

:param component: the type of component to be created
:param identifier: the identifier of the component to be deleted
:param parents: the parents of the component (if any)
"""

queue: t.ClassVar[str] = '_apply'

component: str
identifier: str
parents: t.List[str] = dc.field(default_factory=list)

@property
def huuid(self):
Expand All @@ -389,7 +392,7 @@ def execute(self, db: 'Datalayer'):
"""
try:
object = db.load(component=self.component, identifier=self.identifier)

object.cleanup()
db.metadata.delete_component(self.component, self.identifier)
artifact_ids = db.metadata.get_artifact_relations_for_component(
self.component, self.identifier
Expand All @@ -415,16 +418,20 @@ def execute(self, db: 'Datalayer'):
parent_identifier=self.identifier,
)

object.cleanup()

except Exception as e:
db.metadata.set_component_failed(
component=self.component,
uuid=self.identifier,
reason=f'Failed to delete: {str(e)}',
message=str(format_exc()),
context=None,
)
try:
db.metadata.set_component_failed(
component=self.component,
uuid=object.uuid,
reason=f'Failed to delete: {str(e)}',
message=str(format_exc()),
context=None,
)
except Exception as ee:
logging.error(
f'Failed to set component status: {str(ee)}'
f'while deleting {self.component}:{self.identifier}'
)
raise e


Expand Down
6 changes: 6 additions & 0 deletions superduper/components/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ def dependencies(self):
return [tuple(['Listener'] + list(self.cdc_table.split('__')[1:]))]
return []

def cleanup(self):
"""Cleanup the component."""
super().cleanup()
self.db.cluster.cdc.drop_component(self.component, self.identifier)
self.db.cluster.scheduler.drop_component(self.component, self.identifier)


def _get_parent_cdcs_of_component(component, db: 'Datalayer'):
parents = db.metadata.get_component_version_parents(component.uuid)
Expand Down
3 changes: 2 additions & 1 deletion superduper/components/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def create_table_events(self):

def cleanup(self):
"""Cleanup the table, on removal of the component."""
self.db.databackend.drop_table(self.identifier)
if self.identifier.startswith(CFG.output_prefix):
self.db.databackend.drop_table(self.identifier)

@trigger('apply', requires='data')
def add_data(self):
Expand Down
Loading