|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | | -from collections.abc import Collection, Generator, Iterable, Sequence |
| 3 | +from collections.abc import Collection, Generator, Sequence |
4 | 4 | from contextlib import contextmanager |
5 | | -from dataclasses import dataclass |
6 | 5 |
|
7 | 6 | from django.test import override_settings |
8 | 7 |
|
9 | 8 | from sentry.types.region import Locality, Region, RegionDirectory, get_global_directory |
10 | 9 |
|
11 | 10 |
|
12 | | -@dataclass(frozen=True) |
13 | | -class _TemporaryRegionDirectoryState: |
14 | | - regions: frozenset[Region] |
15 | | - default_region: Region |
16 | | - |
17 | | - |
18 | 11 | class TestEnvRegionDirectory(RegionDirectory): |
19 | 12 | __test__ = False |
20 | 13 |
|
21 | 14 | def __init__(self, regions: Collection[Region]) -> None: |
22 | | - super().__init__(regions, self.localities_from_regions(regions)) |
23 | | - self._tmp_state = _TemporaryRegionDirectoryState( |
24 | | - regions=super().regions, default_region=next(iter(regions)) |
25 | | - ) |
| 15 | + super().__init__(regions, frozenset()) |
| 16 | + self._apply_regions(regions) |
26 | 17 |
|
27 | | - def localities_from_regions(self, regions: Collection[Region]) -> frozenset[Locality]: |
28 | | - return frozenset( |
29 | | - Locality(name=cell.name, cells=frozenset([cell.name]), category=cell.category) |
30 | | - for cell in regions |
| 18 | + def _apply_regions(self, regions: Collection[Region]) -> None: |
| 19 | + localities = frozenset( |
| 20 | + Locality(name=r.name, cells=frozenset([r.name]), category=r.category, visible=r.visible) |
| 21 | + for r in regions |
31 | 22 | ) |
| 23 | + self._cells = frozenset(regions) |
| 24 | + self._by_name = {r.name: r for r in self._cells} |
| 25 | + self._localities = localities |
| 26 | + self._localities_by_name = {loc.name: loc for loc in localities} |
| 27 | + self._cell_to_locality = {cell_name: loc for loc in localities for cell_name in loc.cells} |
32 | 28 |
|
33 | 29 | @contextmanager |
34 | 30 | def swap_state( |
35 | 31 | self, |
36 | 32 | regions: Sequence[Region], |
37 | 33 | local_region: Region | None = None, |
38 | 34 | ) -> Generator[None]: |
39 | | - monolith_region = regions[0] |
40 | | - new_regions = self.regions if regions is None else frozenset(regions) |
41 | | - new_state = _TemporaryRegionDirectoryState( |
42 | | - regions=new_regions, |
43 | | - default_region=local_region or monolith_region, |
| 35 | + prev_state = ( |
| 36 | + self._cells, |
| 37 | + self._by_name, |
| 38 | + self._localities, |
| 39 | + self._localities_by_name, |
| 40 | + self._cell_to_locality, |
44 | 41 | ) |
45 | | - |
46 | | - old_state = self._tmp_state |
47 | 42 | try: |
48 | | - self._tmp_state = new_state |
49 | | - |
| 43 | + self._apply_regions(regions) |
| 44 | + monolith_region = regions[0] |
50 | 45 | with override_settings(SENTRY_MONOLITH_REGION=monolith_region.name): |
51 | 46 | if local_region: |
52 | 47 | with override_settings(SENTRY_REGION=local_region.name): |
53 | 48 | yield |
54 | 49 | else: |
55 | 50 | yield |
56 | 51 | finally: |
57 | | - self._tmp_state = old_state |
| 52 | + ( |
| 53 | + self._cells, |
| 54 | + self._by_name, |
| 55 | + self._localities, |
| 56 | + self._localities_by_name, |
| 57 | + self._cell_to_locality, |
| 58 | + ) = prev_state |
58 | 59 |
|
59 | 60 | @contextmanager |
60 | 61 | def swap_to_default_region(self) -> Generator[None]: |
61 | 62 | """Swap to an arbitrary region when entering region mode.""" |
62 | | - with override_settings(SENTRY_REGION=self._tmp_state.default_region.name): |
| 63 | + region = next(iter(self._cells)) |
| 64 | + with override_settings(SENTRY_REGION=region.name): |
63 | 65 | yield |
64 | 66 |
|
65 | 67 | @contextmanager |
66 | 68 | def swap_to_region_by_name(self, region_name: str) -> Generator[None]: |
67 | 69 | """Swap to the specified region when entering region mode.""" |
68 | | - |
69 | 70 | region = self.get_cell_by_name(region_name) |
70 | 71 | if region is None: |
71 | 72 | raise Exception("specified swap region not found") |
72 | 73 | with override_settings(SENTRY_REGION=region.name): |
73 | 74 | yield |
74 | 75 |
|
75 | | - @property |
76 | | - def regions(self) -> frozenset[Region]: |
77 | | - return self._tmp_state.regions |
78 | | - |
79 | | - @property |
80 | | - def localities(self) -> frozenset[Locality]: |
81 | | - return frozenset( |
82 | | - Locality( |
83 | | - name=r.name, |
84 | | - cells=frozenset([r.name]), |
85 | | - category=r.category, |
86 | | - visible=r.visible, |
87 | | - ) |
88 | | - for r in self._tmp_state.regions |
89 | | - ) |
90 | | - |
91 | | - def get_cell_by_name(self, region_name: str) -> Region | None: |
92 | | - match = (r for r in self._tmp_state.regions if r.name == region_name) |
93 | | - try: |
94 | | - return next(match) |
95 | | - except StopIteration: |
96 | | - return None |
97 | | - |
98 | | - def get_locality_by_name(self, locality_name: str) -> Locality | None: |
99 | | - region = self.get_cell_by_name(locality_name) |
100 | | - if region is None: |
101 | | - return None |
102 | | - return Locality( |
103 | | - name=locality_name, |
104 | | - cells=frozenset([locality_name]), |
105 | | - category=region.category, |
106 | | - visible=region.visible, |
107 | | - ) |
108 | | - |
109 | | - def get_locality_for_cell(self, cell_name: str) -> Locality | None: |
110 | | - region = self.get_cell_by_name(cell_name) |
111 | | - if region is None: |
112 | | - return None |
113 | | - return Locality( |
114 | | - name=cell_name, |
115 | | - cells=frozenset([cell_name]), |
116 | | - category=region.category, |
117 | | - visible=region.visible, |
118 | | - ) |
119 | | - |
120 | | - def get_cells_for_locality(self, locality_name: str) -> Iterable[Region]: |
121 | | - region = self.get_cell_by_name(locality_name) |
122 | | - if region is None: |
123 | | - return () |
124 | | - return (region,) |
125 | | - |
126 | 76 |
|
127 | 77 | def get_test_env_directory() -> TestEnvRegionDirectory: |
128 | 78 | directory = get_global_directory() |
|
0 commit comments