@@ -775,7 +775,6 @@ open Microsoft.Azure.Documents
775775open Serilog
776776open System
777777open System.Collections .Concurrent
778- open System.Runtime .Caching
779778
780779/// Defines policies for retrying with respect to transient failures calling CosmosDb (as opposed to application level concurrency conflicts)
781780type Connection ( client : Client.DocumentClient , [< O ; D ( null )>]? readRetryPolicy : IRetryPolicy , [< O ; D ( null )>]? writeRetryPolicy ) =
@@ -898,16 +897,15 @@ module Caching =
898897 let! intercepted = intercept stream ( token', state')
899898 return SyncResult.Written( intercepted) }
900899
901-
902900 let applyCacheUpdatesWithSlidingExpiration
903901 ( cache : ICache )
904902 ( prefix : string )
905903 ( slidingExpiration : TimeSpan )
906904 ( category : ICategory < 'event , 'state , Container * string >)
907905 : ICategory < 'event , 'state , Container * string > =
908- let cacheEntryGenerator ( initialToken : StreamToken , initialState : 'state ) = new CacheEntry< 'state>( initialToken, initialState, Token.supersedes)
906+ let mkCacheEntry ( initialToken : StreamToken , initialState : 'state ) = new CacheEntry< 'state>( initialToken, initialState, Token.supersedes)
909907 let policy = CacheItemOptions.RelativeExpiration( slidingExpiration)
910- let addOrUpdateSlidingExpirationCacheEntry streamName = cacheEntryGenerator >> cache.UpdateIfNewer policy ( prefix + streamName)
908+ let addOrUpdateSlidingExpirationCacheEntry streamName = mkCacheEntry >> cache.UpdateIfNewer policy ( prefix + streamName)
911909 CategoryTee< 'event, 'state>( category, addOrUpdateSlidingExpirationCacheEntry) :> _
912910
913911type private Folder < 'event , 'state >
@@ -918,17 +916,16 @@ type private Folder<'event, 'state>
918916 let inspectUnfolds = match mapUnfolds with Choice1Of3 () -> false | _ -> true
919917 interface ICategory< 'event, 'state, Container* string> with
920918 member __.Load containerStream ( log : ILogger ): Async < StreamToken * 'state > = async {
921- let! batched = category.Load inspectUnfolds containerStream fold initial isOrigin log
922- let cached tokenAndState = category.LoadFromToken tokenAndState fold isOrigin log
923- match readCache with
919+ let! batched = category.Load inspectUnfolds containerStream fold initial isOrigin log
920+ let cached tokenAndState = category.LoadFromToken tokenAndState fold isOrigin log
921+ match readCache with
922+ | None -> return batched
923+ | Some ( cache : ICache , prefix : string ) ->
924+ let! cacheItem = cache.TryGet( prefix + snd containerStream)
925+ match cacheItem with
924926 | None -> return batched
925- | Some ( cache : ICache , prefix : string ) ->
926- let! cacheItem = cache.TryGet( prefix + snd containerStream)
927- match cacheItem with
928- | None -> return batched
929- | Some tokenAndState -> return ! cached tokenAndState
930- }
931- member __.TrySync ( log : ILogger ) ( streamToken , state ) ( events : 'event list )
927+ | Some tokenAndState -> return ! cached tokenAndState }
928+ member __.TrySync ( log : ILogger ) ( streamToken , state ) ( events : 'event list )
932929 : Async < SyncResult < 'state >> = async {
933930 let! res = category.Sync(( streamToken, state), events, mapUnfolds, fold, isOrigin, log)
934931 match res with
0 commit comments