File tree Expand file tree Collapse file tree 1 file changed +37
-0
lines changed
core/shared/src/test/scala/fs2/concurrent Expand file tree Collapse file tree 1 file changed +37
-0
lines changed Original file line number Diff line number Diff line change @@ -186,6 +186,43 @@ class TopicSuite extends Fs2Suite {
186186 TestControl .executeEmbed(program) // will fail if program is deadlocked
187187 }
188188
189+ // https://github.com/typelevel/fs2/issues/3644
190+ test(
191+ " when publish1 returns success, subscribers must receive the event, even if the publish1 races with close" .fail
192+ ) {
193+ val check : IO [Unit ] =
194+ Topic [IO , String ]
195+ .flatMap { t =>
196+ t.subscribeAwaitUnbounded.replicateA(100 ).use { subs =>
197+ IO .both(t.publish1(" foo" ), t.close) // racing publish1 and close
198+ .flatMap {
199+ case (_, Left (_)) =>
200+ fail(" There's no reason for Topic closure to fail" )
201+ case (published, Right (())) =>
202+ // the topic is closed
203+ subs
204+ .traverse(sub =>
205+ sub.compile.toList // all subscriptions must terminate, since the Topic was closed
206+ )
207+ .map { eventss =>
208+ val expected : List [String ] =
209+ published match {
210+ case Right (()) =>
211+ // publication succeeded, expecting singleton list with the event
212+ List (" foo" )
213+ case Left (Topic .Closed ) =>
214+ // publication rejected, expecting empty list
215+ Nil
216+ }
217+ eventss.foreach(events => assertEquals(events, expected))
218+ }
219+ }
220+ }
221+ }
222+
223+ check.replicateA_(1000 )
224+ }
225+
189226 // https://github.com/typelevel/fs2/issues/3642
190227 test(" subscribe and close concurrently" .flaky) {
191228 val check : IO [Unit ] =
You can’t perform that action at this time.
0 commit comments