Skip to content

Commit b91f278

Browse files
Add 7 tests analogous to JS/Java reference tests; _initial_list now fires ADDED/MODIFIED/DELETED
Co-authored-by: brendandburns <5751682+brendandburns@users.noreply.github.com>
1 parent 5a8fef1 commit b91f278

File tree

2 files changed

+287
-7
lines changed

2 files changed

+287
-7
lines changed

kubernetes/informer/informer.py

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -184,11 +184,45 @@ def _fire(self, event_type, obj):
184184
)
185185

186186
def _initial_list(self):
187-
"""Do the initial list and populate the cache."""
187+
"""List all objects and populate the cache, firing ADDED/MODIFIED/DELETED events.
188+
189+
On the first call (empty cache) every returned item fires ADDED.
190+
On subsequent calls (resync or after a 410 Gone) the new list is
191+
diffed against the existing cache:
192+
* Items absent from the new list fire DELETED.
193+
* Items present in both fire MODIFIED.
194+
* Items only in the new list fire ADDED.
195+
"""
188196
kw = self._build_kwargs()
189197
resp = self._list_func(**kw)
190198
items = getattr(resp, "items", []) or []
199+
200+
# Build key → item map for incoming items.
201+
new_items_map = {}
202+
for item in items:
203+
key = self._cache._key_func(item)
204+
new_items_map[key] = item
205+
206+
# Snapshot the old keys before replacing the cache.
207+
old_keys = set(self._cache.list_keys())
208+
209+
# Fire DELETED for items no longer present in the new list.
210+
for key in old_keys:
211+
if key not in new_items_map:
212+
old_obj = self._cache.get_by_key(key)
213+
if old_obj is not None:
214+
self._fire(DELETED, old_obj)
215+
216+
# Atomically replace the cache.
191217
self._cache._replace_all(items)
218+
219+
# Fire ADDED for genuinely new items, MODIFIED for existing ones.
220+
for key, item in new_items_map.items():
221+
if key in old_keys:
222+
self._fire(MODIFIED, item)
223+
else:
224+
self._fire(ADDED, item)
225+
192226
rv = None
193227
meta = getattr(resp, "metadata", None)
194228
if meta is not None:
@@ -248,9 +282,8 @@ def _run_loop(self):
248282
self._fire(BOOKMARK, event.get("raw_object", obj))
249283
elif evt_type == ERROR:
250284
self._fire(ERROR, obj)
251-
# Periodic resync: full re-list from the API server, then
252-
# fire MODIFIED for every cached object so reconciliation
253-
# loops receive a fresh notification.
285+
# Periodic resync: full re-list from the API server, firing
286+
# ADDED/MODIFIED/DELETED for any changes since the last list.
254287
if (
255288
self._resync_period > 0
256289
and (time.monotonic() - last_resync) >= self._resync_period
@@ -261,9 +294,6 @@ def _run_loop(self):
261294
except Exception as exc:
262295
logger.exception("Error during resync list; continuing")
263296
self._fire(ERROR, exc)
264-
else:
265-
for cached_obj in self._cache.list():
266-
self._fire(MODIFIED, cached_obj)
267297
last_resync = time.monotonic()
268298
except ApiException as exc:
269299
if exc.status == 410:

kubernetes/test/test_informer.py

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,256 @@ def fake_stream(func, **kw):
395395
# list_func called once for the initial list + once for the resync = 2
396396
self.assertEqual(list_func.call_count, 2)
397397

398+
# ------------------------------------------------------------------
399+
# Tests analogous to the JavaScript cache_test.ts and Java
400+
# DefaultSharedIndexInformerWireMockTest scenarios.
401+
# ------------------------------------------------------------------
402+
403+
def test_multiple_handlers_all_fire(self):
404+
"""All handlers registered for the same event type must be invoked."""
405+
pod = _make_pod("default", "multi-pod")
406+
received1 = []
407+
received2 = []
408+
409+
list_func = MagicMock()
410+
list_resp = MagicMock()
411+
list_resp.items = []
412+
list_resp.metadata = MagicMock(resource_version="1")
413+
list_func.return_value = list_resp
414+
415+
informer = SharedInformer(list_func=list_func)
416+
informer.add_event_handler(ADDED, received1.append)
417+
informer.add_event_handler(ADDED, received2.append)
418+
419+
with patch("kubernetes.informer.informer.Watch") as MockWatch:
420+
mock_w = MagicMock()
421+
422+
def fake_stream(func, **kw):
423+
yield {"type": "ADDED", "object": pod}
424+
informer._stop_event.set()
425+
426+
mock_w.stream.side_effect = fake_stream
427+
MockWatch.return_value = mock_w
428+
429+
informer.start()
430+
informer._thread.join(timeout=3)
431+
432+
self.assertEqual(received1, [pod])
433+
self.assertEqual(received2, [pod])
434+
435+
def test_selectors_and_namespace_forwarded(self):
436+
"""namespace, label_selector, and field_selector are forwarded to list_func
437+
and Watch.stream kwargs."""
438+
list_func = MagicMock()
439+
list_resp = MagicMock()
440+
list_resp.items = []
441+
list_resp.metadata = MagicMock(resource_version="1")
442+
list_func.return_value = list_resp
443+
444+
informer = SharedInformer(
445+
list_func=list_func,
446+
namespace="kube-system",
447+
label_selector="app=myapp",
448+
field_selector="status.phase=Running",
449+
)
450+
451+
with patch("kubernetes.informer.informer.Watch") as MockWatch:
452+
mock_w = MagicMock()
453+
mock_w.resource_version = "1"
454+
455+
def fake_stream(func, **kw):
456+
informer._stop_event.set()
457+
return iter([])
458+
459+
mock_w.stream.side_effect = fake_stream
460+
MockWatch.return_value = mock_w
461+
462+
informer.start()
463+
informer._thread.join(timeout=3)
464+
465+
# Initial list call must include all selectors.
466+
list_func.assert_called_once_with(
467+
namespace="kube-system",
468+
label_selector="app=myapp",
469+
field_selector="status.phase=Running",
470+
)
471+
# Watch.stream must also receive them.
472+
_, stream_kw = mock_w.stream.call_args
473+
self.assertEqual(stream_kw.get("namespace"), "kube-system")
474+
self.assertEqual(stream_kw.get("label_selector"), "app=myapp")
475+
self.assertEqual(stream_kw.get("field_selector"), "status.phase=Running")
476+
477+
def test_watch_resource_version_passed_after_initial_list(self):
478+
"""After the initial list, Watch.stream is called with that list's resourceVersion."""
479+
list_func = MagicMock()
480+
list_resp = MagicMock()
481+
list_resp.items = []
482+
list_resp.metadata = MagicMock(resource_version="42")
483+
list_func.return_value = list_resp
484+
485+
informer = SharedInformer(list_func=list_func)
486+
487+
with patch("kubernetes.informer.informer.Watch") as MockWatch:
488+
mock_w = MagicMock()
489+
mock_w.resource_version = "42"
490+
491+
def fake_stream(func, **kw):
492+
informer._stop_event.set()
493+
return iter([])
494+
495+
mock_w.stream.side_effect = fake_stream
496+
MockWatch.return_value = mock_w
497+
498+
informer.start()
499+
informer._thread.join(timeout=3)
500+
501+
_, stream_kw = mock_w.stream.call_args
502+
self.assertEqual(stream_kw.get("resource_version"), "42")
503+
504+
def test_non_410_api_exception_reconnects_without_relist(self):
505+
"""A non-410 ApiException fires ERROR and reconnects without calling list_func again."""
506+
from kubernetes.client.exceptions import ApiException
507+
508+
list_func = MagicMock()
509+
list_resp = MagicMock()
510+
list_resp.items = []
511+
list_resp.metadata = MagicMock(resource_version="1")
512+
list_func.return_value = list_resp
513+
514+
error_received = []
515+
informer = SharedInformer(list_func=list_func)
516+
informer.add_event_handler(ERROR, error_received.append)
517+
518+
stream_calls = {"n": 0}
519+
520+
with patch("kubernetes.informer.informer.Watch") as MockWatch:
521+
mock_w = MagicMock()
522+
mock_w.resource_version = "1"
523+
524+
def fake_stream(func, **kw):
525+
stream_calls["n"] += 1
526+
if stream_calls["n"] == 1:
527+
raise ApiException(status=409, reason="Conflict")
528+
informer._stop_event.set()
529+
return iter([])
530+
531+
mock_w.stream.side_effect = fake_stream
532+
MockWatch.return_value = mock_w
533+
534+
informer.start()
535+
informer._thread.join(timeout=3)
536+
537+
# ERROR fires once for the 409; list_func not called a second time.
538+
self.assertEqual(len(error_received), 1)
539+
self.assertIsInstance(error_received[0], ApiException)
540+
self.assertEqual(error_received[0].status, 409)
541+
self.assertEqual(list_func.call_count, 1)
542+
self.assertEqual(stream_calls["n"], 2)
543+
544+
def test_list_func_error_fires_error_handler(self):
545+
"""If the list function raises an exception the ERROR handler is called."""
546+
from kubernetes.client.exceptions import ApiException
547+
548+
def always_fails(**kw):
549+
raise ApiException(status=403, reason="Forbidden")
550+
551+
error_received = []
552+
informer = SharedInformer(list_func=always_fails)
553+
554+
def on_error(exc):
555+
error_received.append(exc)
556+
informer._stop_event.set() # stop after first error so the test is fast
557+
558+
informer.add_event_handler(ERROR, on_error)
559+
560+
with patch("kubernetes.informer.informer.Watch"):
561+
informer.start()
562+
informer._thread.join(timeout=3)
563+
564+
self.assertEqual(len(error_received), 1)
565+
self.assertIsInstance(error_received[0], ApiException)
566+
self.assertEqual(error_received[0].status, 403)
567+
568+
def test_initial_list_fires_added_for_each_item(self):
569+
"""Items returned by the initial list must each fire an ADDED event."""
570+
pod1 = _make_pod("default", "pod1")
571+
pod2 = _make_pod("default", "pod2")
572+
573+
list_func = MagicMock()
574+
list_resp = MagicMock()
575+
list_resp.items = [pod1, pod2]
576+
list_resp.metadata = MagicMock(resource_version="5")
577+
list_func.return_value = list_resp
578+
579+
received = []
580+
informer = SharedInformer(list_func=list_func)
581+
informer.add_event_handler(ADDED, received.append)
582+
583+
with patch("kubernetes.informer.informer.Watch") as MockWatch:
584+
mock_w = MagicMock()
585+
mock_w.resource_version = "5"
586+
587+
def fake_stream(func, **kw):
588+
informer._stop_event.set()
589+
return iter([])
590+
591+
mock_w.stream.side_effect = fake_stream
592+
MockWatch.return_value = mock_w
593+
594+
informer.start()
595+
informer._thread.join(timeout=3)
596+
597+
self.assertIn(pod1, received)
598+
self.assertIn(pod2, received)
599+
self.assertEqual(len(received), 2)
600+
601+
def test_relist_after_410_fires_delete_for_removed_items(self):
602+
"""After a 410-triggered re-list, items absent from the new list fire DELETED."""
603+
from kubernetes.client.exceptions import ApiException
604+
605+
pod_keep = _make_pod("default", "pod-keep")
606+
pod_delete = _make_pod("default", "pod-delete")
607+
608+
list_call = {"n": 0}
609+
610+
def list_func(**kw):
611+
list_call["n"] += 1
612+
resp = MagicMock()
613+
if list_call["n"] == 1:
614+
resp.items = [pod_keep, pod_delete]
615+
else:
616+
resp.items = [pod_keep] # pod_delete is gone after 410 re-list
617+
resp.metadata = MagicMock(resource_version=str(list_call["n"]))
618+
return resp
619+
620+
deleted = []
621+
informer = SharedInformer(list_func=list_func)
622+
informer.add_event_handler(DELETED, deleted.append)
623+
624+
stream_calls = {"n": 0}
625+
626+
with patch("kubernetes.informer.informer.Watch") as MockWatch:
627+
mock_w = MagicMock()
628+
mock_w.resource_version = "1"
629+
630+
def fake_stream(func, **kw):
631+
stream_calls["n"] += 1
632+
if stream_calls["n"] == 1:
633+
raise ApiException(status=410, reason="Gone")
634+
informer._stop_event.set()
635+
return iter([])
636+
637+
mock_w.stream.side_effect = fake_stream
638+
MockWatch.return_value = mock_w
639+
640+
informer.start()
641+
informer._thread.join(timeout=3)
642+
643+
self.assertIn(pod_delete, deleted)
644+
self.assertNotIn(pod_keep, deleted)
645+
self.assertIsNone(informer.cache.get_by_key("default/pod-delete"))
646+
self.assertIsNotNone(informer.cache.get_by_key("default/pod-keep"))
647+
398648
def test_resource_version_stored_from_watch(self):
399649
"""After the watch stream ends the latest RV is preserved for reconnect."""
400650
pod = _make_pod("default", "rv-pod")

0 commit comments

Comments
 (0)