Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 38 additions & 28 deletions src/log_file.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ module Make (IO : Io.S) (Key : Data.Key) (Value : Data.Value) = struct
Entry.encoded_size)
end

module Scratch = struct
type t = { buffer : bytes (** [Bytes.length buf = Entry.encoded_size] *) }
[@@unboxed]

let create () = { buffer = Bytes.create Entry.encoded_size }
end

type t = {
io : IO.t; (** The disk file handler *)
append_io : string -> unit; (** Pre-allocated [IO.append io] closure *)
Expand All @@ -39,7 +46,6 @@ module Make (IO : Io.S) (Key : Data.Key) (Value : Data.Value) = struct
two. *)
mutable bucket_count_log2 : int;
(** Invariant: equal to [log_2 (Array.length hashtbl)] *)
mutable scratch_buf : bytes;
mutable cardinal : int;
}

Expand All @@ -61,15 +67,17 @@ module Make (IO : Io.S) (Key : Data.Key) (Value : Data.Value) = struct

let flush ?no_callback ~with_fsync t = IO.flush ?no_callback ~with_fsync t.io

let key_of_offset t off =
let r = IO.read t.io ~off ~len:Key.encoded_size t.scratch_buf in
assert (r = Key.encoded_size);
fst (Entry.decode_key (Bytes.unsafe_to_string t.scratch_buf) 0)
let key_of_offset t (scratch : Scratch.t) off =
let len = Key.encoded_size in
let r = IO.read t.io ~off ~len scratch.buffer in
assert (r = len);
fst (Entry.decode_key (Bytes.unsafe_to_string scratch.buffer) 0)

let entry_of_offset t off =
let r = IO.read t.io ~off ~len:Entry.encoded_size t.scratch_buf in
let entry_of_offset t (scratch : Scratch.t) off =
let len = Entry.encoded_size in
let r = IO.read t.io ~off ~len scratch.buffer in
assert (r = Entry.encoded_size);
Entry.decode (Bytes.unsafe_to_string t.scratch_buf) 0
Entry.decode (Bytes.unsafe_to_string scratch.buffer) 0

let elt_index t key =
(* NOTE: we use the _uppermost_ bits of the key hash to index the bucket
Expand All @@ -78,7 +86,7 @@ module Make (IO : Io.S) (Key : Data.Key) (Value : Data.Value) = struct
let unneeded_bits = Key.hash_size - t.bucket_count_log2 in
(Key.hash key lsr unneeded_bits) land ((1 lsl t.bucket_count_log2) - 1)

let resize t =
let resize t scratch =
(* Scale the number of hashtbl buckets. *)
t.bucket_count_log2 <- t.bucket_count_log2 + 1;
let new_bucket_count = 1 lsl t.bucket_count_log2 in
Expand All @@ -94,7 +102,7 @@ module Make (IO : Io.S) (Key : Data.Key) (Value : Data.Value) = struct
let bucket_2i, bucket_2i_plus_1 =
Small_list.to_list bucket
|> List.partition (fun offset ->
let key = key_of_offset t offset in
let key = key_of_offset t scratch offset in
let new_index = elt_index t key in
assert (new_index lsr 1 = i);
new_index land 1 = 0)
Expand All @@ -105,8 +113,8 @@ module Make (IO : Io.S) (Key : Data.Key) (Value : Data.Value) = struct

(** Replace implementation that only updates in-memory state (and doesn't
write the binding to disk). *)
let replace_memory t key offset =
if t.cardinal > 2 * Array.length t.hashtbl then resize t;
let replace_memory t scratch key offset =
if t.cardinal > 2 * Array.length t.hashtbl then resize t scratch;
let elt_idx = elt_index t key in
let bucket = t.hashtbl.(elt_idx) in
let bucket =
Expand All @@ -117,7 +125,7 @@ module Make (IO : Io.S) (Key : Data.Key) (Value : Data.Value) = struct
(* We ensure there's at most one binding for a given key *)
offset'
else
let key' = key_of_offset t offset' in
let key' = key_of_offset t scratch offset' in
match Key.equal key key' with
| false -> offset'
| true ->
Expand All @@ -139,10 +147,13 @@ module Make (IO : Io.S) (Key : Data.Key) (Value : Data.Value) = struct
let replace t key value =
let offset = IO.offset t.io in
Entry.encode' key value t.append_io;
replace_memory t key offset
replace_memory t (Scratch.create ()) key offset

let sync_entries ~min t =
IO.iter_keys ~min (fun offset key -> replace_memory t key offset) t.io
let scratch = Scratch.create () in
IO.iter_keys ~min
(fun offset key -> replace_memory t scratch key offset)
t.io

let reload t =
clear_memory t;
Expand All @@ -160,25 +171,20 @@ module Make (IO : Io.S) (Key : Data.Key) (Value : Data.Value) = struct
in
let hashtbl = Array.make bucket_count Small_list.empty in
let t =
{
io;
append_io = IO.append io;
hashtbl;
bucket_count_log2;
scratch_buf = Bytes.create Entry.encoded_size;
cardinal;
}
{ io; append_io = IO.append io; hashtbl; bucket_count_log2; cardinal }
in
IO.iter_keys (fun offset key -> replace_memory t key offset) io;
let scratch = Scratch.create () in
IO.iter_keys (fun offset key -> replace_memory t scratch key offset) io;
t

let find t key =
let elt_idx = elt_index t key in
let bucket = t.hashtbl.(elt_idx) in
let scratch = Scratch.create () in
Small_list.find_map bucket ~f:(fun offset ->
(* We expect the keys to match most of the time, so we decode the
value at the same time. *)
let entry = entry_of_offset t offset in
let entry = entry_of_offset t scratch offset in
match Key.equal key entry.key with
| false -> None
| true -> Some entry.value)
Expand All @@ -187,21 +193,25 @@ module Make (IO : Io.S) (Key : Data.Key) (Value : Data.Value) = struct
| Some x -> x

let fold t ~f ~init =
let scratch = Scratch.create () in
ArrayLabels.fold_left t.hashtbl ~init ~f:(fun acc bucket ->
Small_list.fold_left bucket ~init:acc ~f:(fun acc offset ->
let entry = entry_of_offset t offset in
let entry = entry_of_offset t scratch offset in
f acc entry))

let iter t ~f =
let scratch = Scratch.create () in
ArrayLabels.iter t.hashtbl ~f:(fun bucket ->
Small_list.iter bucket ~f:(fun offset -> f (entry_of_offset t offset)))
Small_list.iter bucket ~f:(fun offset ->
f (entry_of_offset t scratch offset)))

let to_sorted_seq t =
let scratch = Scratch.create () in
Array.to_seq t.hashtbl
|> Seq.flat_map (fun bucket ->
let arr =
Small_list.to_array bucket
|> Array.map (fun off -> entry_of_offset t off)
|> Array.map (fun off -> entry_of_offset t scratch off)
in
Array.sort Entry.compare arr;
Array.to_seq arr)
Expand Down