Upgrading from 0.8 or earlier? A lot has changed, and this document is intended to make the process easier. If you notice anything missing, submit a PR.
- Library now has zero third-party dependencies; instead there are bindings to both scalaz and cats as separate libraries, see fs2-scalaz and f2-cats
- Much more expressive stream transformation primitives, including support for pushback, prefetching, arbitrary use of asynchronous steps, and the ability to transform any number of streams. This is much more flexible than the previous approach of baking in support for a few fixed 'shapes' like
Process1,Tee, andWye. - Chunking now baked into the library along with support for working with unboxed chunks of primitives; most library operations try to preserve chunkiness whenever possible
- Library no longer reliant on
Taskand users can bring their own effect types - The async package has been generalized to work with any effect type with an
Asyncinstance. AddedSemaphore, an asynchronous semaphore, used as a concurrency primitive in various places. - New functionality in
pipefor forking a stream and sending output through two branches. Used to implementobserveandobserveAsyncand some experimental combinators (pipe.join). - Library now implemented atop a small set of core primitives; there is only one stream interpreter, about 45 LOC, which does not use casts, rest of library could be implemented in 'userspace'
- Various resource safety corner cases have all been addressed and tested, in particular, resource cleanup works in all cases of asynchronous allocation
Stateful transformations like take and so/on are defined in a completely different way, using the Pull data type. See this section of the guide for details.
All resources should be acquired using bracket. Standalone cleanup actions should be placed in an onFinalize. The onComplete method has been removed as it did not guarantee that its parameter would be run if a stream was consumed asynchronously or was terminated early by its consumer.
Processhas been renamed toStream.- There's no
Process1,Tee,Wye, orChanneltype aliases. We decided to simplify. Instead, we have just:type Pipe[F,A,B] = Stream[F,A] => Stream[F,B]type Pipe2[F,A,B,C] = (Stream[F,A], Stream[F,B]) => Stream[F,C]Pipecovers whatChannelandProcess1could do beforePipe2covers whatTeeandWyecould do before- see the code for the package object
- Following this renaming, any functions in
process1have been moved to the modulepipe, andteeandwyehave both been combined intopipe2. mergeNis nowconcurrent.join- New functions
pipe.unNoneTerminateandStream.noneTerminate - New functions
pipe2.mergeDrainL,pipe2.mergeDrainR - For transforming a stream, instead of
pipe,tee,wye, andthroughmethods onStream, there is now justthroughandthrough2:- Example - Before:
s.pipe(process1.take(10))After:s.through(pipe.take(10)) - Example - Before:
s.wye(s2)(wye.blah)Afters.through2(s2)(pipe2.blah)
- Example - Before:
- Use
t.onFinalize(eff)instead oft.onComplete(Stream.eval_(eff)) onHaltno longer exists