11//! A multi-producer, single-consumer, futures-aware, FIFO queue.
22use std:: any:: Any ;
3+ use std:: cell:: RefCell ;
34use std:: collections:: VecDeque ;
45use std:: error:: Error ;
56use std:: fmt;
67use std:: pin:: Pin ;
8+ use std:: rc:: Rc ;
79use std:: task:: { Context , Poll } ;
810
911use futures_sink:: Sink ;
1012use futures_util:: stream:: Stream ;
1113
12- use crate :: cell:: Cell ;
1314use crate :: task:: LocalWaker ;
1415
1516/// Creates a unbounded in-memory channel with buffered storage.
1617pub fn channel < T > ( ) -> ( Sender < T > , Receiver < T > ) {
17- let shared = Cell :: new ( Shared {
18+ let shared = Rc :: new ( RefCell :: new ( Shared {
1819 has_receiver : true ,
1920 buffer : VecDeque :: new ( ) ,
2021 blocked_recv : LocalWaker :: new ( ) ,
21- } ) ;
22+ } ) ) ;
2223 let sender = Sender {
2324 shared : shared. clone ( ) ,
2425 } ;
@@ -38,15 +39,15 @@ struct Shared<T> {
3839/// This is created by the `channel` function.
3940#[ derive( Debug ) ]
4041pub struct Sender < T > {
41- shared : Cell < Shared < T > > ,
42+ shared : Rc < RefCell < Shared < T > > > ,
4243}
4344
4445impl < T > Unpin for Sender < T > { }
4546
4647impl < T > Sender < T > {
4748 /// Sends the provided message along this channel.
4849 pub fn send ( & self , item : T ) -> Result < ( ) , SendError < T > > {
49- let shared = unsafe { self . shared . get_mut_unsafe ( ) } ;
50+ let mut shared = self . shared . borrow_mut ( ) ;
5051 if !shared. has_receiver {
5152 return Err ( SendError ( item) ) ; // receiver was dropped
5253 } ;
@@ -60,7 +61,7 @@ impl<T> Sender<T> {
6061 /// This prevents any further messages from being sent on the channel while
6162 /// still enabling the receiver to drain messages that are buffered.
6263 pub fn close ( & mut self ) {
63- self . shared . get_mut ( ) . has_receiver = false ;
64+ self . shared . borrow_mut ( ) . has_receiver = false ;
6465 }
6566}
6667
@@ -94,8 +95,8 @@ impl<T> Sink<T> for Sender<T> {
9495
9596impl < T > Drop for Sender < T > {
9697 fn drop ( & mut self ) {
97- let count = self . shared . strong_count ( ) ;
98- let shared = self . shared . get_mut ( ) ;
98+ let count = Rc :: strong_count ( & self . shared ) ;
99+ let shared = self . shared . borrow_mut ( ) ;
99100
100101 // check is last sender is about to drop
101102 if shared. has_receiver && count == 2 {
@@ -110,7 +111,7 @@ impl<T> Drop for Sender<T> {
110111/// This is created by the `channel` function.
111112#[ derive( Debug ) ]
112113pub struct Receiver < T > {
113- shared : Cell < Shared < T > > ,
114+ shared : Rc < RefCell < Shared < T > > > ,
114115}
115116
116117impl < T > Receiver < T > {
@@ -127,23 +128,24 @@ impl<T> Unpin for Receiver<T> {}
127128impl < T > Stream for Receiver < T > {
128129 type Item = T ;
129130
130- fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
131- if self . shared . strong_count ( ) == 1 {
131+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
132+ let mut shared = self . shared . borrow_mut ( ) ;
133+ if Rc :: strong_count ( & self . shared ) == 1 {
132134 // All senders have been dropped, so drain the buffer and end the
133135 // stream.
134- Poll :: Ready ( self . shared . get_mut ( ) . buffer . pop_front ( ) )
135- } else if let Some ( msg) = self . shared . get_mut ( ) . buffer . pop_front ( ) {
136+ Poll :: Ready ( shared. buffer . pop_front ( ) )
137+ } else if let Some ( msg) = shared. buffer . pop_front ( ) {
136138 Poll :: Ready ( Some ( msg) )
137139 } else {
138- self . shared . get_mut ( ) . blocked_recv . register ( cx. waker ( ) ) ;
140+ shared. blocked_recv . register ( cx. waker ( ) ) ;
139141 Poll :: Pending
140142 }
141143 }
142144}
143145
144146impl < T > Drop for Receiver < T > {
145147 fn drop ( & mut self ) {
146- let shared = self . shared . get_mut ( ) ;
148+ let mut shared = self . shared . borrow_mut ( ) ;
147149 shared. buffer . clear ( ) ;
148150 shared. has_receiver = false ;
149151 }
0 commit comments