Skip to content

Commit c81b5c7

Browse files
committed
buffer: Use a double-buffering scheme to prevent data races
1 parent 6e2f60e commit c81b5c7

File tree

2 files changed

+38
-13
lines changed

2 files changed

+38
-13
lines changed

buffer.go

+36-13
Original file line numberDiff line numberDiff line change
@@ -15,47 +15,70 @@ import (
1515
)
1616

1717
const defaultBufSize = 4096
18+
const maxCachedBufSize = 16 * 1024
1819

1920
// A buffer which is used for both reading and writing.
2021
// This is possible since communication on each connection is synchronous.
2122
// In other words, we can't write and read simultaneously on the same connection.
2223
// The buffer is similar to bufio.Reader / Writer but zero-copy-ish
2324
// Also highly optimized for this particular use case.
25+
// This buffer is backed by two byte slices in a double-buffering scheme
2426
type buffer struct {
2527
buf []byte // buf is a byte buffer who's length and capacity are equal.
2628
nc net.Conn
2729
idx int
2830
length int
2931
timeout time.Duration
32+
dbuf [2][]byte // dbuf is an array with the two byte slices that back this buffer
33+
flipcnt uint // flipccnt is the current buffer counter for double-buffering
3034
}
3135

3236
// newBuffer allocates and returns a new buffer.
3337
func newBuffer(nc net.Conn) buffer {
38+
fg := make([]byte, defaultBufSize)
39+
bg := make([]byte, defaultBufSize)
3440
return buffer{
35-
buf: make([]byte, defaultBufSize),
36-
nc: nc,
41+
buf: fg,
42+
nc: nc,
43+
dbuf: [2][]byte{fg, bg},
3744
}
3845
}
3946

47+
// flip replaces the active buffer with the background buffer
48+
// this is a delayed flip that simply increases the buffer counter;
49+
// the actual flip will be performed the next time we call `buffer.fill`
50+
func (b *buffer) flip() {
51+
b.flipcnt += 1
52+
}
53+
4054
// fill reads into the buffer until at least _need_ bytes are in it
4155
func (b *buffer) fill(need int) error {
4256
n := b.length
57+
// fill data into its double-buffering target: if we've called
58+
// flip on this buffer, we'll be copying to the background buffer,
59+
// and then filling it with network data; otherwise we'll just move
60+
// the contents of the current buffer to the front before filling it
61+
dest := b.dbuf[b.flipcnt&1]
62+
63+
// grow buffer if necessary to fit the whole packet.
64+
if need > len(dest) {
65+
// Round up to the next multiple of the default size
66+
dest = make([]byte, ((need/defaultBufSize)+1)*defaultBufSize)
4367

44-
// move existing data to the beginning
45-
if n > 0 && b.idx > 0 {
46-
copy(b.buf[0:n], b.buf[b.idx:])
68+
// if the allocated buffer is not too large, move it to backing storage
69+
// to prevent extra allocations on applications that perform large reads
70+
if len(dest) <= maxCachedBufSize {
71+
b.dbuf[b.flipcnt&1] = dest
72+
}
4773
}
4874

49-
// grow buffer if necessary
50-
// TODO: let the buffer shrink again at some point
51-
// Maybe keep the org buf slice and swap back?
52-
if need > len(b.buf) {
53-
// Round up to the next multiple of the default size
54-
newBuf := make([]byte, ((need/defaultBufSize)+1)*defaultBufSize)
55-
copy(newBuf, b.buf)
56-
b.buf = newBuf
75+
// if we're filling the fg buffer, move the existing data to the start of it.
76+
// if we're filling the bg buffer, copy over the data
77+
if n > 0 {
78+
copy(dest[0:n], b.buf[b.idx:])
5779
}
5880

81+
b.buf = dest
5982
b.idx = 0
6083

6184
for {

rows.go

+2
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ func (rows *mysqlRows) Close() (err error) {
111111
return err
112112
}
113113

114+
mc.buf.flip()
115+
114116
// Remove unread packets from stream
115117
if !rows.rs.done {
116118
err = mc.readUntilEOF()

0 commit comments

Comments
 (0)