1919import static org .mockito .Matchers .*;
2020import static org .mockito .Mockito .*;
2121
22+ import java .util .Random ;
2223import java .util .concurrent .ExecutorService ;
2324import java .util .concurrent .Executors ;
2425import java .util .concurrent .Future ;
@@ -68,10 +69,18 @@ public final class SynchronizedObserver<T> implements Observer<T> {
6869 private final SafeObservableSubscription subscription ;
6970 private volatile boolean finishRequested = false ;
7071 private volatile boolean finished = false ;
72+ private volatile Object lock ;
7173
7274 public SynchronizedObserver (Observer <? super T > Observer , SafeObservableSubscription subscription ) {
7375 this .observer = Observer ;
7476 this .subscription = subscription ;
77+ this .lock = this ;
78+ }
79+
80+ public SynchronizedObserver (Observer <? super T > Observer , SafeObservableSubscription subscription , Object lock ) {
81+ this .observer = Observer ;
82+ this .subscription = subscription ;
83+ this .lock = lock ;
7584 }
7685
7786 /**
@@ -80,16 +89,15 @@ public SynchronizedObserver(Observer<? super T> Observer, SafeObservableSubscrip
8089 * @param Observer
8190 */
8291 public SynchronizedObserver (Observer <? super T > Observer ) {
83- this .observer = Observer ;
84- this .subscription = new SafeObservableSubscription ();
92+ this (Observer , new SafeObservableSubscription ());
8593 }
8694
8795 public void onNext (T arg ) {
8896 if (finished || finishRequested || subscription .isUnsubscribed ()) {
8997 // if we're already stopped, or a finish request has been received, we won't allow further onNext requests
9098 return ;
9199 }
92- synchronized (this ) {
100+ synchronized (lock ) {
93101 // check again since this could have changed while waiting
94102 if (finished || finishRequested || subscription .isUnsubscribed ()) {
95103 // if we're already stopped, or a finish request has been received, we won't allow further onNext requests
@@ -105,7 +113,7 @@ public void onError(Throwable e) {
105113 return ;
106114 }
107115 finishRequested = true ;
108- synchronized (this ) {
116+ synchronized (lock ) {
109117 // check again since this could have changed while waiting
110118 if (finished || subscription .isUnsubscribed ()) {
111119 return ;
@@ -121,7 +129,7 @@ public void onCompleted() {
121129 return ;
122130 }
123131 finishRequested = true ;
124- synchronized (this ) {
132+ synchronized (lock ) {
125133 // check again since this could have changed while waiting
126134 if (finished || subscription .isUnsubscribed ()) {
127135 return ;
@@ -188,6 +196,46 @@ public void testMultiThreadedBasic() {
188196 assertEquals (1 , busyObserver .maxConcurrentThreads .get ());
189197 }
190198
199+ @ Test
200+ public void testMultiThreadedBasicWithLock () {
201+ Subscription s = mock (Subscription .class );
202+ TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable (s , "one" , "two" , "three" );
203+ Observable <String > w = Observable .create (onSubscribe );
204+
205+ SafeObservableSubscription as = new SafeObservableSubscription (s );
206+ BusyObserver busyObserver = new BusyObserver ();
207+
208+ Object lock = new Object ();
209+ ExternalBusyThread externalBusyThread = new ExternalBusyThread (busyObserver , lock , 10 , 100 );
210+
211+ SynchronizedObserver <String > aw = new SynchronizedObserver <String >(busyObserver , as , lock );
212+
213+ externalBusyThread .start ();
214+
215+ w .subscribe (aw );
216+ onSubscribe .waitToFinish ();
217+
218+ try {
219+ externalBusyThread .join (10000 );
220+ assertFalse (externalBusyThread .isAlive ());
221+ assertFalse (externalBusyThread .fail );
222+ } catch (InterruptedException e ) {
223+ // ignore
224+ }
225+
226+ assertEquals (3 , busyObserver .onNextCount .get ());
227+ assertFalse (busyObserver .onError );
228+ assertTrue (busyObserver .onCompleted );
229+ // non-deterministic because unsubscribe happens after 'waitToFinish' releases
230+ // so commenting out for now as this is not a critical thing to test here
231+ // verify(s, times(1)).unsubscribe();
232+
233+ // we can have concurrency ...
234+ assertTrue (onSubscribe .maxConcurrentThreads .get () > 1 );
235+ // ... but the onNext execution should be single threaded
236+ assertEquals (1 , busyObserver .maxConcurrentThreads .get ());
237+ }
238+
191239 @ Test
192240 public void testMultiThreadedWithNPE () {
193241 Subscription s = mock (Subscription .class );
@@ -220,6 +268,52 @@ public void testMultiThreadedWithNPE() {
220268 assertEquals (1 , busyObserver .maxConcurrentThreads .get ());
221269 }
222270
271+ @ Test
272+ public void testMultiThreadedWithNPEAndLock () {
273+ Subscription s = mock (Subscription .class );
274+ TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable (s , "one" , "two" , "three" , null );
275+ Observable <String > w = Observable .create (onSubscribe );
276+
277+ SafeObservableSubscription as = new SafeObservableSubscription (s );
278+ BusyObserver busyObserver = new BusyObserver ();
279+
280+ Object lock = new Object ();
281+ ExternalBusyThread externalBusyThread = new ExternalBusyThread (busyObserver , lock , 10 , 100 );
282+
283+ SynchronizedObserver <String > aw = new SynchronizedObserver <String >(busyObserver , as , lock );
284+
285+ externalBusyThread .start ();
286+
287+ w .subscribe (aw );
288+ onSubscribe .waitToFinish ();
289+
290+ try {
291+ externalBusyThread .join (10000 );
292+ assertFalse (externalBusyThread .isAlive ());
293+ assertFalse (externalBusyThread .fail );
294+ } catch (InterruptedException e ) {
295+ // ignore
296+ }
297+
298+ System .out .println ("maxConcurrentThreads: " + onSubscribe .maxConcurrentThreads .get ());
299+
300+ // we can't know how many onNext calls will occur since they each run on a separate thread
301+ // that depends on thread scheduling so 0, 1, 2 and 3 are all valid options
302+ // assertEquals(3, busyObserver.onNextCount.get());
303+ assertTrue (busyObserver .onNextCount .get () < 4 );
304+ assertTrue (busyObserver .onError );
305+ // no onCompleted because onError was invoked
306+ assertFalse (busyObserver .onCompleted );
307+ // non-deterministic because unsubscribe happens after 'waitToFinish' releases
308+ // so commenting out for now as this is not a critical thing to test here
309+ //verify(s, times(1)).unsubscribe();
310+
311+ // we can have concurrency ...
312+ assertTrue (onSubscribe .maxConcurrentThreads .get () > 1 );
313+ // ... but the onNext execution should be single threaded
314+ assertEquals (1 , busyObserver .maxConcurrentThreads .get ());
315+ }
316+
223317 @ Test
224318 public void testMultiThreadedWithNPEinMiddle () {
225319 Subscription s = mock (Subscription .class );
@@ -250,6 +344,50 @@ public void testMultiThreadedWithNPEinMiddle() {
250344 assertEquals (1 , busyObserver .maxConcurrentThreads .get ());
251345 }
252346
347+ @ Test
348+ public void testMultiThreadedWithNPEinMiddleAndLock () {
349+ Subscription s = mock (Subscription .class );
350+ TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable (s , "one" , "two" , "three" , null , "four" , "five" , "six" , "seven" , "eight" , "nine" );
351+ Observable <String > w = Observable .create (onSubscribe );
352+
353+ SafeObservableSubscription as = new SafeObservableSubscription (s );
354+ BusyObserver busyObserver = new BusyObserver ();
355+
356+ Object lock = new Object ();
357+ ExternalBusyThread externalBusyThread = new ExternalBusyThread (busyObserver , lock , 10 , 100 );
358+
359+ SynchronizedObserver <String > aw = new SynchronizedObserver <String >(busyObserver , as , lock );
360+
361+ externalBusyThread .start ();
362+
363+ w .subscribe (aw );
364+ onSubscribe .waitToFinish ();
365+
366+ try {
367+ externalBusyThread .join (10000 );
368+ assertFalse (externalBusyThread .isAlive ());
369+ assertFalse (externalBusyThread .fail );
370+ } catch (InterruptedException e ) {
371+ // ignore
372+ }
373+
374+ System .out .println ("maxConcurrentThreads: " + onSubscribe .maxConcurrentThreads .get ());
375+ // this should not be the full number of items since the error should stop it before it completes all 9
376+ System .out .println ("onNext count: " + busyObserver .onNextCount .get ());
377+ assertTrue (busyObserver .onNextCount .get () < 9 );
378+ assertTrue (busyObserver .onError );
379+ // no onCompleted because onError was invoked
380+ assertFalse (busyObserver .onCompleted );
381+ // non-deterministic because unsubscribe happens after 'waitToFinish' releases
382+ // so commenting out for now as this is not a critical thing to test here
383+ // verify(s, times(1)).unsubscribe();
384+
385+ // we can have concurrency ...
386+ assertTrue (onSubscribe .maxConcurrentThreads .get () > 1 );
387+ // ... but the onNext execution should be single threaded
388+ assertEquals (1 , busyObserver .maxConcurrentThreads .get ());
389+ }
390+
253391 /**
254392 * A non-realistic use case that tries to expose thread-safety issues by throwing lots of out-of-order
255393 * events on many threads.
@@ -617,14 +755,32 @@ private static class BusyObserver implements Observer<String> {
617755
618756 @ Override
619757 public void onCompleted () {
758+ threadsRunning .incrementAndGet ();
759+
620760 System .out .println (">>> BusyObserver received onCompleted" );
621761 onCompleted = true ;
762+
763+ int concurrentThreads = threadsRunning .get ();
764+ int maxThreads = maxConcurrentThreads .get ();
765+ if (concurrentThreads > maxThreads ) {
766+ maxConcurrentThreads .compareAndSet (maxThreads , concurrentThreads );
767+ }
768+ threadsRunning .decrementAndGet ();
622769 }
623770
624771 @ Override
625772 public void onError (Throwable e ) {
773+ threadsRunning .incrementAndGet ();
774+
626775 System .out .println (">>> BusyObserver received onError: " + e .getMessage ());
627776 onError = true ;
777+
778+ int concurrentThreads = threadsRunning .get ();
779+ int maxThreads = maxConcurrentThreads .get ();
780+ if (concurrentThreads > maxThreads ) {
781+ maxConcurrentThreads .compareAndSet (maxThreads , concurrentThreads );
782+ }
783+ threadsRunning .decrementAndGet ();
628784 }
629785
630786 @ Override
@@ -652,6 +808,70 @@ public void onNext(String args) {
652808
653809 }
654810
811+ private static class ExternalBusyThread extends Thread {
812+
813+ private BusyObserver observer ;
814+ private Object lock ;
815+ private int lockTimes ;
816+ private int waitTime ;
817+ public volatile boolean fail ;
818+
819+ public ExternalBusyThread (BusyObserver observer , Object lock , int lockTimes , int waitTime ) {
820+ this .observer = observer ;
821+ this .lock = lock ;
822+ this .lockTimes = lockTimes ;
823+ this .waitTime = waitTime ;
824+ this .fail = false ;
825+ }
826+
827+ @ Override
828+ public void run () {
829+ Random r = new Random ();
830+ for (int i = 0 ; i < lockTimes ; i ++) {
831+ synchronized (lock ) {
832+ int oldOnNextCount = observer .onNextCount .get ();
833+ boolean oldOnCompleted = observer .onCompleted ;
834+ boolean oldOnError = observer .onError ;
835+ try {
836+ Thread .sleep (r .nextInt (waitTime ));
837+ } catch (InterruptedException e ) {
838+ // ignore
839+ }
840+ // Since we own the lock, onNextCount, onCompleted and
841+ // onError must not be changed.
842+ int newOnNextCount = observer .onNextCount .get ();
843+ boolean newOnCompleted = observer .onCompleted ;
844+ boolean newOnError = observer .onError ;
845+ if (oldOnNextCount != newOnNextCount ) {
846+ System .out .println (">>> ExternalBusyThread received different onNextCount: "
847+ + oldOnNextCount
848+ + " -> "
849+ + newOnNextCount );
850+ fail = true ;
851+ break ;
852+ }
853+ if (oldOnCompleted != newOnCompleted ) {
854+ System .out .println (">>> ExternalBusyThread received different onCompleted: "
855+ + oldOnCompleted
856+ + " -> "
857+ + newOnCompleted );
858+ fail = true ;
859+ break ;
860+ }
861+ if (oldOnError != newOnError ) {
862+ System .out .println (">>> ExternalBusyThread received different onError: "
863+ + oldOnError
864+ + " -> "
865+ + newOnError );
866+ fail = true ;
867+ break ;
868+ }
869+ }
870+ }
871+ }
872+
873+ }
874+
655875 }
656876
657877}
0 commit comments