Modify BufferPool prefetch to async load blocks#149
Modify BufferPool prefetch to async load blocks#149asimmahmood1 wants to merge 26 commits intoopensearch-project:mainfrom
Conversation
* length param is honored * use OS threadpool model, new fixed threadpool is called 'crypto_plugin_prefetch_threadpool' * currently uses available processors * TODO: will add a config to make it a factor of available process, likely default to 2.0 * single async call that loads all blocks, can be modified to load all blocks in parallel if needed * will do more performance tests Signed-off-by: Asim Mahmood <asim.seng@gmail.com>
JMH - cold pathblock size 4kb write using directIO so its not mmaped 40kp open using mmap channel io loop: read: 1-10 hotwarm up then prefetch and read |
* prefetch is mostly IO work so threads will be blocked * also prefetch will only be called in search path, which has fixed threads * check cache first before prefetch * the cache check may act to dedup, not sure if dedicated dedup strategy is needed * will add JMH benchmarks, osb isn't showing any change Signed-off-by: Asim Mahmood <asim.seng@gmail.com>
Signed-off-by: Asim Mahmood <asim.seng@gmail.com>
* similar to Lucene's stored field reader * use long[32] array of startOffset, that checked first * this array is created per file, slices share the array * scaling threadpool doesn't have a queue, switch back to use fixed Signed-off-by: Asim Mahmood <asim.seng@gmail.com>
src/main/java/org/opensearch/index/store/bufferpoolfs/CachedMemorySegmentIndexInput.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/index/store/bufferpoolfs/CachedMemorySegmentIndexInput.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/index/store/CryptoDirectoryPlugin.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/index/store/bufferpoolfs/CachedMemorySegmentIndexInput.java
Outdated
Show resolved
Hide resolved
| final FileBlockCacheKey firstBlockKey = new FileBlockCacheKey(path, startBlockOffset); | ||
| if (blockCache.get(firstBlockKey) != null) { | ||
| return; | ||
| } |
There was a problem hiding this comment.
Can you please explain why the check for first block is needed here ? If first block is present that doesn't mean others are also present in the queue right ?
There was a problem hiding this comment.
I should explain that in comment as well: this is the simplest approach that might reload some blocks. What is the probability that subsequent blocks are missing if 1st one isn't available? I would argue that it is unlikely since prefetch is useful for sequential reads.
Alternatives are:
- Load after 1st missing block - should still be simpler. I'm ok to go with this alternative. Even though blockCache look up is sync, its still cheaper than IO.
- Check each missing block, then load them separately. In order to reduce the IO calls, it'll be better to collect continue blocks. Is it worth the effort?
Ideally I would add some metrics and test out all 3 using some benchmarks. On the hand I'm not familiar with non-search usecases like knn.
There was a problem hiding this comment.
Another thing is, as we improve our readahead algorithm. Readaheads should also be able to catchup for subsequent consecutive blocks.
src/main/java/org/opensearch/index/store/CryptoDirectoryPlugin.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/index/store/bufferpoolfs/CachedMemorySegmentIndexInput.java
Outdated
Show resolved
Hide resolved
* found a bug in loadForPrefetch, it doesn't check cache first Signed-off-by: Asim Mahmood <asim.seng@gmail.com>
* before it was loading (IO) all blocks regardless of cache entry * now it loads only misisng cache values * contigous missing blocks are combined to a single load call * TODO: add metrics Signed-off-by: Asim Mahmood <asim.seng@gmail.com>
src/main/java/org/opensearch/index/store/CryptoDirectoryPlugin.java
Outdated
Show resolved
Hide resolved
|
|
||
| // Use cache size to determine, but double it so we're more aggressive than read ahead | ||
| if (queueSize == -1) { | ||
| queueSize = ReadAheadSizingPolicy.calculateQueueSize(maxCacheBlocks) * 2; |
There was a problem hiding this comment.
prefetch should be called for blocks which are deterministic to be accessed instead of being speculative like read ahead. With that in mind, should we keep the queue size same as maxCacheBlocks as that is the number of blocks we should be able to prefetch as part of one or more search requests. Whereas currently read-ahead is done in speculative manner without being IOContext aware which can lead to unnecessary cache churns. @abiesps thoughts ?
src/main/java/org/opensearch/index/store/bufferpoolfs/CachedMemorySegmentIndexInput.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/index/store/bufferpoolfs/CachedMemorySegmentIndexInput.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/index/store/block_cache/CaffeineBlockCache.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/index/store/block_cache/CaffeineBlockCache.java
Show resolved
Hide resolved
|
|
||
| // Use cache size to determine, but double it so we're more aggressive than read ahead | ||
| if (queueSize == -1) { | ||
| queueSize = ReadAheadSizingPolicy.calculateQueueSize(maxCacheBlocks) * 2; |
There was a problem hiding this comment.
With that in mind, should we keep the queue size same as maxCacheBlocks as that is the number of blocks we should be able to prefetch as part of one or more search requests.
maxCacheBlocks will be in 10s of thousands (remember each block is 8kb), having a queue that large is better off a UnboundedQueue. Anyways, if you think you need to prefetch all the cache blocks, your caching is really screwed up.
|
Please run a OSB http_log or big_5 and post results with and without this change. |
* Based on the discussion, will estimate default threadpool size to be (search+index_searcher)*4. Since this prefetch will mostly mostly be blocked on IO, and its trying to help the search path by prefetching, we want to be more aggressive. * For queue size, for search lucene itelf only calls with block size 1 and there might be 10s of calls per query, but knn it can be much worst case e.g. 32 neighbors, there can be 1000 calls. So we'll estimate threads*1000 as default. Will tune this in future based on benchmark results. Signed-off-by: Asim Mahmood <asim.seng@gmail.com>
* use concurrent hashmap to dedup * map is created per file but shared across slices, this avoid shared map across each directory so keeps the concurrency load low, and use simple offset (long) as key * FastUtil would be even faster, but don't wnat to introduct a new dependency * added unit tests, will had JMH to prove the improvement Signed-off-by: Asim Mahmood <asim.seng@gmail.com>
* read ahead is only called when there is a cache miss * while search prefetch may already be cached Signed-off-by: Asim Mahmood <asim.seng@gmail.com>
* now the executor is passed into CaffeineBlockCache * single map per node, instead of per file Signed-off-by: Asim Mahmood <asim.seng@gmail.com>
src/main/java/org/opensearch/index/store/block_cache/CaffeineBlockCache.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/index/store/block_cache/CaffeineBlockCache.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/index/store/block_cache/CaffeineBlockCache.java
Outdated
Show resolved
Hide resolved
| if (prefetchCache != null) { | ||
| prefetchCache.keySet().removeIf(key -> key instanceof FileBlockCacheKey fk && fk.filePath().equals(normalized)); | ||
| } |
There was a problem hiding this comment.
In what case a key getting invalidated will be in prefetchCache ? If it is in prefetchCache that means it was not found in block cache and the download for the key is in-progress
| * @param maxBlocks the maximum number of blocks to cache (currently unused but kept for API compatibility) | ||
| */ | ||
| public CaffeineBlockCache(Cache<BlockCacheKey, BlockCacheValue<T>> cache, BlockLoader<V> blockLoader, long maxBlocks) { | ||
| this(cache, blockLoader, maxBlocks, null, null); |
There was a problem hiding this comment.
why we want to support null cache and executor inside the block cache ? This will also help to keep the code simple in load methods by avoiding all the null checks
JMH Test 1File: 100MB encrypted file (12,800 blocks × 8KB) Cache layers:
Threads:
Read pattern:
Other changes:
JMH config: 1 warmup + 1 measurement iteration, 10s each, 1 fork, throughput mode
SummarySummary
Next Steps / issues
Detailed stats: DetailsWarmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, mode = bufferpool, prefetchEnabled = true)Fork: 1 of 1Warmup Iteration 1:[STATS] passes=88 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, mode = bufferpool, prefetchEnabled = false)Fork: 1 of 1Warmup Iteration 1:[STATS] passes=117 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = false, mode = bufferpool, prefetchEnabled = true)Fork: 1 of 1Warmup Iteration 1:[STATS] passes=2 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = false, mode = bufferpool, prefetchEnabled = false)Fork: 1 of 1Warmup Iteration 1:[STATS] passes=2 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, mode = bufferpool, prefetchEnabled = true)Fork: 1 of 1Warmup Iteration 1:[STATS] passes=235 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, mode = bufferpool, prefetchEnabled = false)Fork: 1 of 1Warmup Iteration 1:[STATS] passes=455 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = false, mode = bufferpool, prefetchEnabled = true)Fork: 1 of 1Warmup Iteration 1:[STATS] passes=8 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = false, mode = bufferpool, prefetchEnabled = false)Fork: 1 of 1Warmup Iteration 1:[STATS] passes=8 PrefetchBufferpoolVsMMapBenchmark.read_1Threads false mmap true thrpt 283.351 ops/ms Benchmark result is saved to /workplace/asimmahm/opensearch-storage-encryption/build/jmh-results/jmh_20260310_232238.json |
JMH Test #2Same setup as #1, except:
Results:
|
JMH Test #3
DetailsWarmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, executorType = opensearch, mode = bufferpool, prefetchMode = async)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=57606 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, executorType = opensearch, mode = bufferpool, prefetchMode = inline_check)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=142703 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, executorType = opensearch, mode = bufferpool, prefetchMode = off)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=115417 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, executorType = jdk, mode = bufferpool, prefetchMode = async)Fork: 1 of 1Warmup Iteration 1:[STATS] passes=45118 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, executorType = jdk, mode = bufferpool, prefetchMode = inline_check)Fork: 1 of 1Warmup Iteration 1:[STATS] passes=123391 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, executorType = jdk, mode = bufferpool, prefetchMode = off)Fork: 1 of 1Warmup Iteration 1:[STATS] passes=366843 Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=137 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = false, executorType = opensearch, mode = bufferpool, prefetchMode = inline_check)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=133 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = false, executorType = opensearch, mode = bufferpool, prefetchMode = off)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=160 |
JMH TEST 3PrefetchBufferpoolVsMMapBenchmark.read_4Threads-Throughput-cacheWarm-true-executorType-opensearch-mode-bufferpool-prefetchMode-async: PrefetchBufferpoolVsMMapBenchmark.read_4Threads-Throughput-cacheWarm-true-executorType-opensearch-mode-bufferpool-prefetchMode-inline_check PrefetchBufferpoolVsMMapBenchmark.read_4Threads-Throughput-cacheWarm-true-executorType-opensearch-mode-bufferpool-prefetchMode-off |
|
My interpretation so far is that, since there is 1:1 ratio of prefetch and reads within the block, the async hand off is not cheap in this benchmark. There are over 2MM calls to executor vs the actual work the prefetch threads need to do. In real world, will there be these many prefetch calls vs the read calls? If there will be many more reads than prefetch, then this cost should be low. Other option still to just do cache check in search path, and load what's missing async. JMH Test 4 - prefetch 16 blocks, read 16 times each blockDetailsdev-dsk-asimmahm-2c-a6d21262 % ./jmh_compact.sh Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, executorType = opensearch, mode = bufferpool, prefetchMode = async)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=56073 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, executorType = opensearch, mode = bufferpool, prefetchMode = inline_check)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=278737 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, executorType = opensearch, mode = bufferpool, prefetchMode = inline_load)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=274780 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, executorType = opensearch, mode = bufferpool, prefetchMode = off)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=291510 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, executorType = opensearch, mode = mmap, prefetchMode = async)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=1977521 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, executorType = opensearch, mode = mmap, prefetchMode = inline_check)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=1477068 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, executorType = opensearch, mode = mmap, prefetchMode = inline_load)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=1483658 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = true, executorType = opensearch, mode = mmap, prefetchMode = off)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=6117138 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = false, executorType = opensearch, mode = bufferpool, prefetchMode = async)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=138 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = false, executorType = opensearch, mode = bufferpool, prefetchMode = inline_check)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=136 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = false, executorType = opensearch, mode = bufferpool, prefetchMode = inline_load)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=158 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = false, executorType = opensearch, mode = bufferpool, prefetchMode = off)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=160 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = false, executorType = opensearch, mode = mmap, prefetchMode = async)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=10254 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = false, executorType = opensearch, mode = mmap, prefetchMode = inline_check)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=9420 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = false, executorType = opensearch, mode = mmap, prefetchMode = inline_load)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=159 Warmup: 1 iterations, 10 s eachBenchmark mode: Throughput, ops/timeParameters: (cacheWarm = false, executorType = opensearch, mode = mmap, prefetchMode = off)Fork: 1 of 1Warmup Iteration 1: WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.[STATS] passes=11123 |
JMH Test 5 - 16 prefetch blocks, then full read |
JMH TEST 5
PrefetchMode:
cacheWarm:
Result:
|
|
Cost of prefetch async call when cache is warm (score 28.741) Compared to stubbed/noop prefetch call (score 45.294) The LinkedTransferQueue is not highly performant, so might be worth trying higher throughput queues. But before I do that, will focus on trying to reduce the IO calls for search path. @abiesps recently added new L1 cache (array based), so will check that 1st in read path.
|
JMH Test 6 - Added getOrLoad mode
asimmahmood1@6339ead#diff-b91741cefaa9515609d6aa33e0e3d9b7d37c4f64200d4b98ca6d0e1833cf23fd Details |
Hotpath Summary
|
JMH Test 7 - Compare no prefetch, with tiny block cache vs radixIts better but still far away from 500 score for mmap. |
JMH Test 9 - simulate slow IO, reduce readLong
|
JMH Test 10 - slower IO comparison
|
JMH 11 - Hot path cost of async
|
|
I am trying to see how can we make the hot path of prefetch faster than mmap. |
JMH 12 - Hotpath: forkjoin is 3x faster than opensearch threadpool
|
…d style to dedup load with search * to avoid dup with search request, use getOrLoad * we loose the IO collapsing option, but that will be redone at lower IO layer * ForkJoin shows 10x improvement compared to LinkedTransferQueue used by fixed opensearch threadpool * 3x improvement compared to mmap prefetch, with isLoaded and without backoff * added more metrics Signed-off-by: Asim Mahmood <asim.seng@gmail.com>
Signed-off-by: Asim Mahmood <asim.seng@gmail.com>
| final long endFileOffset = absoluteBaseOffset + offset + length; | ||
| final long endBlockOffset = (endFileOffset + CACHE_BLOCK_MASK) & ~CACHE_BLOCK_MASK; | ||
| final long blockCount = (endBlockOffset - startBlockOffset) >>> CACHE_BLOCK_SIZE_POWER; | ||
|
|
There was a problem hiding this comment.
Are we planning to add L1 cache short-circuit here? I was thinking to add it now even with BlockSlotTinyCache so that once we replace tiny cache with Radix table we dont miss it.
|
I am also wondering why is ForkJoinPool so much better than opensearch threadpool ? Do we know the reason behind it ? |
|
On the cold path of prefetch, How much is the delay, is there a way that i can check how does reading first byte latency varies with mmap and bufferpool as we increase this delay - starting from zero ? Ratio of reading first byte latency and actual IO latency with delay could be a good metric and benchmark for measuring cold path prefetch performance. I want to be sure that we don't have "any application level bottlenecks that are adding on to this delay unnecessarily". We could also run this benchmark on a host with 'slower' file system to see how does this 'delay' varies with IO latencies from file system |
Main reason is fork join doesn't have a shared queue, each work has its own queue. On submission it randomly distributes to a worker, if worker runs on of tasks, then it steels from others. The downside is fork join doesn't have strict FIFO. Also, queues are unbounded, so I added the inflightCount check before submitting that in case of slow IO we start dropping prefetch requests. The inflighMap.size() isn't a constant time check, so use a dedicated atomic int to count. If we do want more FIFO, there are other queue options that provide higher throughput (e.g. netty's jcp).
|
JMH ColdFinal comparison (both prewarmed, truly cold):
Key findings:
SetupBenchmark Setup (per trial):
Per invocation (cold setup):
Call PathMmap call path: |
* added L1BlockCache interface with contains method * L1 Radix table can swapped in later * contains is cheap array look up, no pinning Signed-off-by: Asim Mahmood <asim.seng@gmail.com>
Signed-off-by: Asim Mahmood <asim.seng@gmail.com>
Signed-off-by: Asim Mahmood <asim.seng@gmail.com>
|
So time to submit IO is more or less same with bufferpool and mmap ~40 micro seconds. Issues are in the 'IO path' which should be covered in cold path optimizations for bufferpool. |






Description
1. Async Prefetch Architecture
2. Prefetch Cache for Deduplication
3. Smart Block Loading
4. Prefetch Threadpool Configuration
5. API Changes
6. Test Updates
Key Architectural Improvements:
Related Issues
Resolves #119
Testing
jmh
Test setup in https://github.com/asimmahmood1/opensearch-storage-encryption/tree/jmhPrefetch
Sequential Prefetch: noKms vs jmhPrefetch (async)
OSB
Ran big5.
Full Run
Details
Check List
--signoff.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.