Skip to content

Commit e12eafa

Browse files
committed
Remove component
1 parent aa57262 commit e12eafa

File tree

7 files changed

+105
-41
lines changed

7 files changed

+105
-41
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
7979
- Fix the issue related to services.
8080
- Fix the snowflake plugin
8181
- Fix the export method
82+
- Recursive component deletion overeagerly skipping components
8283

8384
## [0.5.0](https://github.com/superduper-io/superduper/compare/0.5.0...0.4.0]) (2024-Nov-02)
8485

superduper/backends/base/backends.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,13 @@ def drop_component(self, component: str, identifier: str):
9595
tool_id = self.uuid_tool_mapping[uuid]
9696
tool_ids.append(tool_id)
9797
del self.uuid_tool_mapping[uuid]
98-
self.tool_uuid_mapping[tool_id].remove(uuid)
98+
try:
99+
self.tool_uuid_mapping[tool_id].remove(uuid)
100+
except KeyError:
101+
logging.warn(
102+
f"KeyError: {tool_id} -> {uuid} not found in tool_uuid_mapping"
103+
)
104+
continue
99105
if not self.tool_uuid_mapping[tool_id]:
100106
self.tools[tool_id].drop()
101107
del self.tools[tool_id]

superduper/backends/local/cdc.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,10 @@ def list_uuids(self):
3333
return list(self._trigger_uuid_mapping.values())
3434

3535
def put_component(self, component: str, uuid: str):
36-
self.triggers.add((component, uuid))
36+
pass
3737

3838
def drop_component(self, component, identifier):
39-
c = self.db.load(component=component, identifier=identifier)
40-
if isinstance(c, CDC):
41-
self.triggers.remove(c.cdc_table)
39+
pass
4240

4341
def initialize(self):
4442
"""Initialize the CDC."""

superduper/base/datalayer.py

Lines changed: 70 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import json
21
import time
32
import typing as t
4-
from collections import namedtuple
3+
from collections import defaultdict, namedtuple
54

65
import click
6+
import networkx as nx
77

88
import superduper as s
99
from superduper import CFG, logging
@@ -368,6 +368,53 @@ def apply(
368368
)
369369
return result
370370

371+
def _filter_deletions_by_cascade(self, events: t.List[Delete]):
372+
"""
373+
Filter deletions by cascade.
374+
375+
:param events: List of events to filter.
376+
"""
377+
all_huuids = set(e.huuid for e in events)
378+
lookup = {e.huuid: e for e in events}
379+
380+
graph = nx.DiGraph()
381+
382+
for e in events:
383+
graph.add_node(e.huuid)
384+
for dep in e.parents:
385+
if dep in all_huuids:
386+
graph.add_edge(dep, e.huuid)
387+
388+
# sort events by graph
389+
sorted_nodes = nx.topological_sort(graph)
390+
events = [lookup[n] for n in sorted_nodes if n in all_huuids]
391+
392+
conflicting_events = [
393+
e.huuid for e in events if not set(e.parents).issubset(all_huuids)
394+
]
395+
potential_breakages = defaultdict(list)
396+
if conflicting_events:
397+
for e in conflicting_events:
398+
potential_breakages[e].extend(
399+
[x for x in lookup[e].parents if x not in all_huuids]
400+
)
401+
402+
all_conflicts = conflicting_events[:]
403+
404+
if conflicting_events:
405+
for e in conflicting_events:
406+
# find descendants downstream from the event
407+
downstream = list(nx.descendants(graph, e))
408+
all_conflicts.extend(downstream)
409+
410+
all_conflicts = set(all_conflicts)
411+
412+
events = [e for e in events if e.huuid not in all_conflicts]
413+
414+
non_table_events = [e for e in events if e.component != 'Table']
415+
table_events = [e for e in events if e.component == 'Table']
416+
return non_table_events + table_events, potential_breakages
417+
371418
def remove(
372419
self,
373420
component: str,
@@ -385,28 +432,35 @@ def remove(
385432
:param force: Toggle to force remove the component.
386433
"""
387434
events: t.List[Delete] = []
388-
failed: t.List[str] = []
389435
self._build_remove(
390436
component=component,
391437
identifier=identifier,
392438
events=events,
393-
failed=failed,
394439
recursive=recursive,
395440
)
396441

397-
if failed and not force:
442+
events = list({e.huuid: e for e in events}.values()) # remove duplicates
443+
444+
filtered_events, potential_breakages = self._filter_deletions_by_cascade(events)
445+
446+
if potential_breakages and not force:
447+
msg = '\n' + '\n'.join(
448+
' ' + f'{k} -> {v}' for k, v in potential_breakages.items()
449+
)
398450
raise exceptions.Conflict(
399-
component, identifier, f"the following components are in use: {failed}"
451+
component,
452+
identifier,
453+
f"the following components are using some components scheduler for deletion: {msg}",
400454
)
401455

402-
for i, e in enumerate(events):
456+
for i, e in enumerate(filtered_events):
403457
logging.info(
404-
f'Removing component [{i + 1}/{len(events)}] '
458+
f'Removing component [{i + 1}/{len(filtered_events)}] '
405459
f'{e.component}:{e.identifier}'
406460
)
407461
e.execute(self)
408462
logging.info(
409-
f'Removing component [{i + 1}/{len(events)}] '
463+
f'Removing component [{i + 1}/{len(filtered_events)}] '
410464
f'{e.component}:{e.identifier}... DONE'
411465
)
412466

@@ -415,30 +469,22 @@ def _build_remove(
415469
component: str,
416470
identifier: str,
417471
events: t.List,
418-
failed: t.List,
419472
recursive: bool = False,
420473
):
421474

422475
object = self.load(component=component, identifier=identifier)
423476

424-
previous = [e.huuid for e in events]
425-
426477
parents = self.metadata.get_component_parents(
427478
component=component, identifier=identifier
428479
)
429-
fail = False
430-
if parents:
431-
# Only fail the deletion attempt if the parents aren't in this cascade
432-
for p in parents:
433-
if f'{p[0]}:{p[1]}' not in previous:
434-
failed.append(f'{component}:{identifier} -> {p[0]}:{p[1]}')
435-
fail = True
436-
437-
# If the deletion fails, we need to stop
438-
if fail:
439-
return
440480

441-
events.append(Delete(component=component, identifier=identifier))
481+
events.append(
482+
Delete(
483+
component=component,
484+
identifier=identifier,
485+
parents=[':'.join(p) for p in parents],
486+
)
487+
)
442488

443489
if recursive:
444490
children = object.get_children()
@@ -448,7 +494,6 @@ def _build_remove(
448494
c.identifier,
449495
recursive=True,
450496
events=events,
451-
failed=failed,
452497
)
453498

454499
def load_all(self, component: str, **kwargs) -> t.List[Component]:

superduper/base/event.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import dataclasses as dc
12
import math
23
import typing as t
34
from abc import abstractmethod
@@ -370,12 +371,14 @@ class Delete(Event):
370371
371372
:param component: the type of component to be created
372373
:param identifier: the identifier of the component to be deleted
374+
:param parents: the parents of the component (if any)
373375
"""
374376

375377
queue: t.ClassVar[str] = '_apply'
376378

377379
component: str
378380
identifier: str
381+
parents: t.List[str] = dc.field(default_factory=list)
379382

380383
@property
381384
def huuid(self):
@@ -389,7 +392,7 @@ def execute(self, db: 'Datalayer'):
389392
"""
390393
try:
391394
object = db.load(component=self.component, identifier=self.identifier)
392-
395+
object.cleanup()
393396
db.metadata.delete_component(self.component, self.identifier)
394397
artifact_ids = db.metadata.get_artifact_relations_for_component(
395398
self.component, self.identifier
@@ -415,16 +418,20 @@ def execute(self, db: 'Datalayer'):
415418
parent_identifier=self.identifier,
416419
)
417420

418-
object.cleanup()
419-
420421
except Exception as e:
421-
db.metadata.set_component_failed(
422-
component=self.component,
423-
uuid=self.identifier,
424-
reason=f'Failed to delete: {str(e)}',
425-
message=str(format_exc()),
426-
context=None,
427-
)
422+
try:
423+
db.metadata.set_component_failed(
424+
component=self.component,
425+
uuid=object.uuid,
426+
reason=f'Failed to delete: {str(e)}',
427+
message=str(format_exc()),
428+
context=None,
429+
)
430+
except Exception as ee:
431+
logging.error(
432+
f'Failed to set component status: {str(ee)}'
433+
f'while deleting {self.component}:{self.identifier}'
434+
)
428435
raise e
429436

430437

superduper/components/cdc.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ def dependencies(self):
3333
return [tuple(['Listener'] + list(self.cdc_table.split('__')[1:]))]
3434
return []
3535

36+
def cleanup(self):
37+
"""Cleanup the component."""
38+
super().cleanup()
39+
self.db.cluster.cdc.drop_component(self.component, self.identifier)
40+
self.db.cluster.scheduler.drop_component(self.component, self.identifier)
41+
3642

3743
def _get_parent_cdcs_of_component(component, db: 'Datalayer'):
3844
parents = db.metadata.get_component_version_parents(component.uuid)

superduper/components/table.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ def create_table_events(self):
5555

5656
def cleanup(self):
5757
"""Cleanup the table, on removal of the component."""
58-
self.db.databackend.drop_table(self.identifier)
58+
if self.identifier.startswith(CFG.output_prefix):
59+
self.db.databackend.drop_table(self.identifier)
5960

6061
@trigger('apply', requires='data')
6162
def add_data(self):

0 commit comments

Comments
 (0)