Skip to content

Commit 82f02c3

Browse files
committed
Support block compression API.
1 parent 422064f commit 82f02c3

File tree

10 files changed

+692
-43
lines changed

10 files changed

+692
-43
lines changed

README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ This project attempts to support all of the features enumerated in the [LZ4 Fram
2626
- [Sparse](./pkg/sparse) write support
2727
- Random read access (see [caveats](#random-read-access))
2828

29-
29+
While the primary purpose of plz4 is to support parallel processing, the raw block API's have also been supported for cases where the payloads are very small and do not benefit from LZ4 Framing.
3030

3131
## Design
3232

@@ -57,6 +57,11 @@ There is another LZ4 Frame feature that is problematic at scale. By default, pl
5757

5858
Another advantage of independent blocks is the potential to support random read access. This is possible because each block can be independently decompressed. To support this, plz4 provides an optional progress callback that emits both the source offset and corresponding block offset during compression. An implementation can use this information to build lookup tables that can later be used to skip ahead during decompression to a known block offset. plz4 provides the 'WithReadOffset' option on the NewReader API to skip ahead and start decompression at a known block offset.
5959

60+
### CGO
61+
62+
63+
This package uses CGO to call the canonical LZ4 library which is written in C. There may be be cases where CGO is not desired, and in those cases the package also supports building with the environment variable "CGO_ENABLED=0". In general, the library runs a bit slower in that mode and not all features are available.
64+
6065

6166
## Install
6267

cmd/plz4/internal/ops/bakeoff.go

Lines changed: 127 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func RunBakeoff() error {
3131
)
3232

3333
// Consume into RAM; must be able to seek
34-
if rdr == os.Stdin {
34+
if rdr == os.Stdin || CLI.Bakeoff.RAM {
3535
var buf bytes.Buffer
3636
n, err := io.Copy(&buf, rdr)
3737
if err != nil {
@@ -125,16 +125,22 @@ func outputOptions() error {
125125
dict = CLI.Dict
126126
}
127127

128-
t.AppendRows([]table.Row{
129-
{"File name", fn},
130-
{"Dictionary", dict},
131-
{"Concurrency", CLI.Cpus},
132-
{"Block Size", CLI.Bakeoff.BS},
133-
{"Block Checksum", CLI.Bakeoff.BX},
134-
{"Blocks Linked", CLI.Bakeoff.BD},
135-
{"Content Checksum", CLI.Bakeoff.CS},
136-
{"Content Size", CLI.Bakeoff.CX},
137-
})
128+
if CLI.Bakeoff.BlockMode {
129+
t.AppendRows([]table.Row{
130+
{"File name", fn},
131+
})
132+
} else {
133+
t.AppendRows([]table.Row{
134+
{"File name", fn},
135+
{"Dictionary", dict},
136+
{"Concurrency", CLI.Cpus},
137+
{"Block Size", CLI.Bakeoff.BS},
138+
{"Block Checksum", CLI.Bakeoff.BX},
139+
{"Blocks Linked", CLI.Bakeoff.BD},
140+
{"Content Checksum", CLI.Bakeoff.CS},
141+
{"Content Size", CLI.Bakeoff.CX},
142+
})
143+
}
138144

139145
t.Render()
140146
return nil
@@ -143,8 +149,13 @@ func outputOptions() error {
143149
func outputResults(srcSz int64, plz4Results, lz4Results []resultT) error {
144150
fmt.Println()
145151

152+
mode := "frame mode"
153+
if CLI.Bakeoff.BlockMode {
154+
mode = "block mode"
155+
}
156+
146157
t := table.NewWriter()
147-
t.SetTitle("Bakeoff Results")
158+
t.SetTitle(fmt.Sprintf("Bakeoff Results [%s]", mode))
148159
t.SetStyle(table.StyleColoredBright)
149160
t.SetOutputMirror(os.Stdout)
150161
t.AppendHeader(table.Row{"Algo", "Level", "SrcSize", "Compressed", "Ratio", "Compress", "Decompress"})
@@ -208,6 +219,18 @@ func _prepLz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, err
208219

209220
opts = append(opts, lz4.OnBlockDoneOption(cbHandler))
210221

222+
var srcBlock []byte
223+
if CLI.Bakeoff.BlockMode {
224+
srcBlock, err = io.ReadAll(rd)
225+
if err != nil {
226+
return nil, err
227+
}
228+
if _, err := rd.Seek(0, io.SeekStart); err != nil {
229+
return nil, err
230+
}
231+
232+
}
233+
211234
bakeFunc := func() ([]resultT, error) {
212235
defer tr.MarkAsDone()
213236

@@ -216,19 +239,33 @@ func _prepLz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, err
216239
for ; i < 10; i++ {
217240
start := time.Now()
218241

219-
if _, err := rd.Seek(0, io.SeekStart); err != nil {
220-
return nil, err
221-
}
242+
var (
243+
split time.Time
244+
cnt int64
245+
err error
246+
)
222247

223-
// Last one wins; so append is ok.
224248
lvl, err := lz4Level(i)
225249
if err != nil {
226250
return nil, err
227251
}
228252

229-
opts = append(opts, lz4.CompressionLevelOption(lvl))
253+
if srcBlock != nil {
254+
// Block mode
255+
split, cnt, err = lz4BakeOneBlock(srcBlock, lvl)
256+
257+
} else {
258+
259+
if _, err := rd.Seek(0, io.SeekStart); err != nil {
260+
return nil, err
261+
}
262+
263+
// Last one wins; so append is ok.
264+
opts = append(opts, lz4.CompressionLevelOption(lvl))
265+
266+
split, cnt, err = lz4BakeOne(rd, opts...)
267+
}
230268

231-
split, cnt, err := lz4BakeOne(rd, opts...)
232269
if err != nil {
233270
return nil, err
234271
}
@@ -335,21 +372,44 @@ func _prepPlz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, er
335372
plz4.WithWorkerPool(wp),
336373
)
337374

375+
var srcBlock []byte
376+
if CLI.Bakeoff.BlockMode {
377+
srcBlock, err = io.ReadAll(rd)
378+
if err != nil {
379+
return nil, err
380+
}
381+
if _, err := rd.Seek(0, io.SeekStart); err != nil {
382+
return nil, err
383+
}
384+
}
385+
338386
var results []resultT
339387

340388
for ; i < 12; i++ {
341389
start := time.Now()
342390

343-
if _, err := rd.Seek(0, io.SeekStart); err != nil {
344-
return nil, err
345-
}
346-
347-
// Last one wins; so append is ok.
348-
opts = append(opts,
349-
plz4.WithLevel(plz4.LevelT(i+1)),
391+
var (
392+
split time.Time
393+
cnt int64
394+
err error
350395
)
351396

352-
split, cnt, err := plz4BakeOne(rd, opts...)
397+
if srcBlock != nil {
398+
// Block mode
399+
split, cnt, err = plz4BakeOneBlock(srcBlock, plz4.LevelT(i+1))
400+
401+
} else {
402+
// Last one wins; so append is ok.
403+
404+
if _, err := rd.Seek(0, io.SeekStart); err != nil {
405+
return nil, err
406+
}
407+
408+
opts = append(opts,
409+
plz4.WithLevel(plz4.LevelT(i+1)),
410+
)
411+
split, cnt, err = plz4BakeOne(rd, opts...)
412+
}
353413
if err != nil {
354414
return nil, err
355415
}
@@ -373,6 +433,7 @@ func _prepPlz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, er
373433
}
374434

375435
func plz4BakeOne(src io.Reader, opts ...plz4.OptT) (split time.Time, cnt int64, err error) {
436+
376437
var (
377438
fh *os.File
378439
wr io.Writer
@@ -445,6 +506,46 @@ func _plz4Decompress(rd io.Reader) error {
445506
return err
446507
}
447508

509+
func lz4BakeOneBlock(src []byte, level lz4.CompressionLevel) (split time.Time, cnt int64, err error) {
510+
511+
var (
512+
sz = lz4.CompressBlockBound(len(src))
513+
dst = make([]byte, sz)
514+
n int
515+
)
516+
517+
if level == lz4.Fast {
518+
n, err = lz4.CompressBlock(src, dst, nil)
519+
} else {
520+
n, err = lz4.CompressBlockHC(src, dst, level, nil, nil)
521+
}
522+
if err != nil {
523+
return
524+
}
525+
526+
dst = dst[:n]
527+
split = time.Now()
528+
cnt = int64(n)
529+
530+
tmp := make([]byte, len(src))
531+
532+
_, err = lz4.UncompressBlock(dst, tmp)
533+
return
534+
}
535+
536+
func plz4BakeOneBlock(src []byte, level plz4.LevelT) (split time.Time, cnt int64, err error) {
537+
538+
dst, err := plz4.CompressBlock(src, plz4.WithBlockCompressionLevel(level))
539+
if err != nil {
540+
return
541+
}
542+
543+
split = time.Now()
544+
_, err = plz4.DecompressBlock(dst)
545+
cnt = int64(len(dst))
546+
return
547+
}
548+
448549
func _lz4Decompress(rd io.Reader) error {
449550

450551
frd := lz4.NewReader(rd)

cmd/plz4/internal/ops/cli.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@ var CLI struct {
2525
Skip bool `help:"Skip decompress" short:"s"`
2626
} `cmd:"" aliases:"v,ver" help:"Verify lz4 data"`
2727
Bakeoff struct {
28-
File string `optional:"" arg:"" type:"existingfile"`
29-
BS string `help:"Block size [4MB, 1MB, 256KB, 64KB]" default:"4MB"`
30-
BD bool `help:"Enable linked blocks"`
31-
BX bool `help:"Enable block checksum"`
32-
CX bool `help:"Enable content checksum"`
33-
CS bool `help:"Enable content size; fails on stdin"`
34-
RAM bool `help:"Process data in RAM"`
28+
File string `optional:"" arg:"" type:"existingfile"`
29+
BS string `help:"Block size [4MB, 1MB, 256KB, 64KB]" default:"4MB"`
30+
BD bool `help:"Enable linked blocks"`
31+
BX bool `help:"Enable block checksum"`
32+
CX bool `help:"Enable content checksum"`
33+
CS bool `help:"Enable content size; fails on stdin"`
34+
RAM bool `help:"Process data in RAM"`
35+
BlockMode bool `help:"Use block API instead of frame API" short:"B"`
3536
} `cmd:"" aliases:"b,bake" help:"Compare performance to github.com/pierrec/lz4"`
3637

3738
Cpus int `help:"Concurrency [0 synchronous] [-1 auto]" default:"-1" short:"c"`

internal/pkg/clz4/clz4.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ func byteSliceToCharPointer(b []byte) *C.char {
2424
return (*C.char)(unsafe.Pointer(&b[0]))
2525
}
2626

27+
func CompressBound(sz int) int {
28+
return int(C.LZ4_compressBound(C.int(sz)))
29+
}
30+
2731
func CompressFast(source, dest []byte, acceleration int) (int, error) {
2832
ret := int(C.LZ4_compress_fast(
2933
byteSliceToCharPointer(source),

internal/pkg/compress/compress.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,14 @@ func (f CompressorFactory) newIndie() Compressor {
6868
}
6969

7070
func (f CompressorFactory) newLinked() Compressor {
71-
switch {
72-
case f.level == 1:
71+
switch f.level {
72+
case 1:
7373
return newLinkedCompressor(f.dictCtx)
7474
default:
7575
return newLinkedCompressorHC(f.level, f.dictCtxHC)
7676
}
7777
}
78+
79+
func CompressBound(sz int) int {
80+
return clz4.CompressBound(sz)
81+
}

internal/pkg/compress/nocgo_compress.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,30 +47,27 @@ func (f CompressorFactory) NewCompressor() Compressor {
4747
}
4848

4949
type fastCompressor struct {
50-
cmp lz4.Compressor
5150
}
5251

5352
func (c *fastCompressor) Compress(src, dst, dict []byte) (int, error) {
54-
return c.cmp.CompressBlock(src, dst)
53+
return lz4.CompressBlock(src, dst, nil)
5554
}
5655

5756
func NewCompressorHC(level int) Compressor {
5857
if level > 9 {
5958
level = 9
6059
}
6160
return &hcCompressor{
62-
cmp: lz4.CompressorHC{
63-
Level: lz4Level(level),
64-
},
61+
level: lz4Level(level),
6562
}
6663
}
6764

6865
type hcCompressor struct {
69-
cmp lz4.CompressorHC
66+
level lz4.CompressionLevel
7067
}
7168

7269
func (c *hcCompressor) Compress(src, dst, dict []byte) (int, error) {
73-
return c.cmp.CompressBlock(src, dst)
70+
return lz4.CompressBlockHC(src, dst, c.level, nil, nil)
7471
}
7572

7673
type failedCompressor struct {
@@ -110,3 +107,7 @@ func lz4Level(l int) lz4.CompressionLevel {
110107
}
111108
return lz4Level
112109
}
110+
111+
func CompressBound(sz int) int {
112+
return lz4.CompressBlockBound(sz)
113+
}

0 commit comments

Comments
 (0)