Skip to content

Commit f93a990

Browse files
GODRIVER-2520 Remove deadline setters from gridfs (#1427)
1 parent 5ca3832 commit f93a990

9 files changed

+282
-307
lines changed

mongo/gridfs/bucket.go

Lines changed: 123 additions & 170 deletions
Large diffs are not rendered by default.

mongo/gridfs/bucket_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright (C) MongoDB, Inc. 2023-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
package gridfs
8+
9+
import (
10+
"context"
11+
"testing"
12+
13+
"go.mongodb.org/mongo-driver/bson"
14+
"go.mongodb.org/mongo-driver/internal/assert"
15+
"go.mongodb.org/mongo-driver/internal/integtest"
16+
"go.mongodb.org/mongo-driver/mongo"
17+
"go.mongodb.org/mongo-driver/mongo/options"
18+
)
19+
20+
func TestBucket_openDownloadStream(t *testing.T) {
21+
tests := []struct {
22+
name string
23+
filter interface{}
24+
err error
25+
}{
26+
{
27+
name: "nil filter",
28+
filter: nil,
29+
err: mongo.ErrNilDocument,
30+
},
31+
{
32+
name: "nonmatching filter",
33+
filter: bson.D{{"x", 1}},
34+
err: ErrFileNotFound,
35+
},
36+
}
37+
38+
cs := integtest.ConnString(t)
39+
clientOpts := options.Client().ApplyURI(cs.Original)
40+
41+
client, err := mongo.Connect(context.Background(), clientOpts)
42+
assert.Nil(t, err, "Connect error: %v", err)
43+
44+
db := client.Database("bucket")
45+
46+
for _, test := range tests {
47+
t.Run(test.name, func(t *testing.T) {
48+
bucket, err := NewBucket(db)
49+
assert.NoError(t, err)
50+
51+
_, err = bucket.openDownloadStream(context.Background(), test.filter)
52+
assert.ErrorIs(t, err, test.err)
53+
})
54+
}
55+
}

mongo/gridfs/download_stream.go

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ type DownloadStream struct {
3737
bufferStart int
3838
bufferEnd int
3939
expectedChunk int32 // index of next expected chunk
40-
readDeadline time.Time
4140
fileLen int64
41+
ctx context.Context
4242

4343
// The pointer returned by GetFile. This should not be used in the actual DownloadStream code outside of the
4444
// newDownloadStream constructor because the values can be mutated by the user after calling GetFile. Instead,
@@ -94,7 +94,7 @@ func newFileFromResponse(resp findFileResponse) *File {
9494
}
9595
}
9696

97-
func newDownloadStream(cursor *mongo.Cursor, chunkSize int32, file *File) *DownloadStream {
97+
func newDownloadStream(ctx context.Context, cursor *mongo.Cursor, chunkSize int32, file *File) *DownloadStream {
9898
numChunks := int32(math.Ceil(float64(file.Length) / float64(chunkSize)))
9999

100100
return &DownloadStream{
@@ -105,6 +105,7 @@ func newDownloadStream(cursor *mongo.Cursor, chunkSize int32, file *File) *Downl
105105
done: cursor == nil,
106106
fileLen: file.Length,
107107
file: file,
108+
ctx: ctx,
108109
}
109110
}
110111

@@ -121,16 +122,6 @@ func (ds *DownloadStream) Close() error {
121122
return nil
122123
}
123124

124-
// SetReadDeadline sets the read deadline for this download stream.
125-
func (ds *DownloadStream) SetReadDeadline(t time.Time) error {
126-
if ds.closed {
127-
return ErrStreamClosed
128-
}
129-
130-
ds.readDeadline = t
131-
return nil
132-
}
133-
134125
// Read reads the file from the server and writes it to a destination byte slice.
135126
func (ds *DownloadStream) Read(p []byte) (int, error) {
136127
if ds.closed {
@@ -141,17 +132,12 @@ func (ds *DownloadStream) Read(p []byte) (int, error) {
141132
return 0, io.EOF
142133
}
143134

144-
ctx, cancel := deadlineContext(ds.readDeadline)
145-
if cancel != nil {
146-
defer cancel()
147-
}
148-
149135
bytesCopied := 0
150136
var err error
151137
for bytesCopied < len(p) {
152138
if ds.bufferStart >= ds.bufferEnd {
153139
// Buffer is empty and can load in data from new chunk.
154-
err = ds.fillBuffer(ctx)
140+
err = ds.fillBuffer(ds.ctx)
155141
if err != nil {
156142
if err == errNoMoreChunks {
157143
if bytesCopied == 0 {
@@ -183,18 +169,13 @@ func (ds *DownloadStream) Skip(skip int64) (int64, error) {
183169
return 0, nil
184170
}
185171

186-
ctx, cancel := deadlineContext(ds.readDeadline)
187-
if cancel != nil {
188-
defer cancel()
189-
}
190-
191172
var skipped int64
192173
var err error
193174

194175
for skipped < skip {
195176
if ds.bufferStart >= ds.bufferEnd {
196177
// Buffer is empty and can load in data from new chunk.
197-
err = ds.fillBuffer(ctx)
178+
err = ds.fillBuffer(ds.ctx)
198179
if err != nil {
199180
if err == errNoMoreChunks {
200181
return skipped, nil

mongo/gridfs/gridfs_examples_test.go

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,13 @@ func ExampleBucket_OpenUploadStream() {
2828
// collection document.
2929
uploadOpts := options.GridFSUpload().
3030
SetMetadata(bson.D{{"metadata tag", "tag"}})
31-
uploadStream, err := bucket.OpenUploadStream("filename", uploadOpts)
31+
32+
// Use WithContext to force a timeout if the upload does not succeed in
33+
// 2 seconds.
34+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
35+
defer cancel()
36+
37+
uploadStream, err := bucket.OpenUploadStream(ctx, "filename", uploadOpts)
3238
if err != nil {
3339
log.Fatal(err)
3440
}
@@ -38,13 +44,6 @@ func ExampleBucket_OpenUploadStream() {
3844
}
3945
}()
4046

41-
// Use SetWriteDeadline to force a timeout if the upload does not succeed in
42-
// 2 seconds.
43-
err = uploadStream.SetWriteDeadline(time.Now().Add(2 * time.Second))
44-
if err != nil {
45-
log.Fatal(err)
46-
}
47-
4847
if _, err = uploadStream.Write(fileContent); err != nil {
4948
log.Fatal(err)
5049
}
@@ -59,6 +58,7 @@ func ExampleBucket_UploadFromStream() {
5958
uploadOpts := options.GridFSUpload().
6059
SetMetadata(bson.D{{"metadata tag", "tag"}})
6160
fileID, err := bucket.UploadFromStream(
61+
context.Background(),
6262
"filename",
6363
bytes.NewBuffer(fileContent),
6464
uploadOpts)
@@ -73,7 +73,12 @@ func ExampleBucket_OpenDownloadStream() {
7373
var bucket *gridfs.Bucket
7474
var fileID primitive.ObjectID
7575

76-
downloadStream, err := bucket.OpenDownloadStream(fileID)
76+
// Use WithContext to force a timeout if the download does not succeed in
77+
// 2 seconds.
78+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
79+
defer cancel()
80+
81+
downloadStream, err := bucket.OpenDownloadStream(ctx, fileID)
7782
if err != nil {
7883
log.Fatal(err)
7984
}
@@ -83,13 +88,6 @@ func ExampleBucket_OpenDownloadStream() {
8388
}
8489
}()
8590

86-
// Use SetReadDeadline to force a timeout if the download does not succeed
87-
// in 2 seconds.
88-
err = downloadStream.SetReadDeadline(time.Now().Add(2 * time.Second))
89-
if err != nil {
90-
log.Fatal(err)
91-
}
92-
9391
fileBuffer := bytes.NewBuffer(nil)
9492
if _, err := io.Copy(fileBuffer, downloadStream); err != nil {
9593
log.Fatal(err)
@@ -100,8 +98,10 @@ func ExampleBucket_DownloadToStream() {
10098
var bucket *gridfs.Bucket
10199
var fileID primitive.ObjectID
102100

101+
ctx := context.Background()
102+
103103
fileBuffer := bytes.NewBuffer(nil)
104-
if _, err := bucket.DownloadToStream(fileID, fileBuffer); err != nil {
104+
if _, err := bucket.DownloadToStream(ctx, fileID, fileBuffer); err != nil {
105105
log.Fatal(err)
106106
}
107107
}
@@ -110,7 +110,7 @@ func ExampleBucket_Delete() {
110110
var bucket *gridfs.Bucket
111111
var fileID primitive.ObjectID
112112

113-
if err := bucket.Delete(fileID); err != nil {
113+
if err := bucket.Delete(context.Background(), fileID); err != nil {
114114
log.Fatal(err)
115115
}
116116
}
@@ -122,7 +122,7 @@ func ExampleBucket_Find() {
122122
filter := bson.D{
123123
{"length", bson.D{{"$gt", 1000}}},
124124
}
125-
cursor, err := bucket.Find(filter)
125+
cursor, err := bucket.Find(context.Background(), filter)
126126
if err != nil {
127127
log.Fatal(err)
128128
}
@@ -150,15 +150,17 @@ func ExampleBucket_Rename() {
150150
var bucket *gridfs.Bucket
151151
var fileID primitive.ObjectID
152152

153-
if err := bucket.Rename(fileID, "new file name"); err != nil {
153+
ctx := context.Background()
154+
155+
if err := bucket.Rename(ctx, fileID, "new file name"); err != nil {
154156
log.Fatal(err)
155157
}
156158
}
157159

158160
func ExampleBucket_Drop() {
159161
var bucket *gridfs.Bucket
160162

161-
if err := bucket.Drop(); err != nil {
163+
if err := bucket.Drop(context.Background()); err != nil {
162164
log.Fatal(err)
163165
}
164166
}

mongo/gridfs/gridfs_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func TestGridFS(t *testing.T) {
8181
bucket, err := NewBucket(db, tt.bucketOpts)
8282
assert.Nil(t, err, "NewBucket error: %v", err)
8383

84-
us, err := bucket.OpenUploadStream("filename", tt.uploadOpts)
84+
us, err := bucket.OpenUploadStream(context.Background(), "filename", tt.uploadOpts)
8585
assert.Nil(t, err, "OpenUploadStream error: %v", err)
8686

8787
expectedBucketChunkSize := DefaultChunkSize

mongo/gridfs/upload_stream.go

Lines changed: 21 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,25 @@ type UploadStream struct {
3333
*Upload // chunk size and metadata
3434
FileID interface{}
3535

36-
chunkIndex int
37-
chunksColl *mongo.Collection // collection to store file chunks
38-
filename string
39-
filesColl *mongo.Collection // collection to store file metadata
40-
closed bool
41-
buffer []byte
42-
bufferIndex int
43-
fileLen int64
44-
writeDeadline time.Time
36+
chunkIndex int
37+
chunksColl *mongo.Collection // collection to store file chunks
38+
filename string
39+
filesColl *mongo.Collection // collection to store file metadata
40+
closed bool
41+
buffer []byte
42+
bufferIndex int
43+
fileLen int64
44+
ctx context.Context
4545
}
4646

4747
// NewUploadStream creates a new upload stream.
48-
func newUploadStream(upload *Upload, fileID interface{}, filename string, chunks, files *mongo.Collection) *UploadStream {
48+
func newUploadStream(
49+
ctx context.Context,
50+
upload *Upload,
51+
fileID interface{},
52+
filename string,
53+
chunks, files *mongo.Collection,
54+
) *UploadStream {
4955
return &UploadStream{
5056
Upload: upload,
5157
FileID: fileID,
@@ -54,6 +60,7 @@ func newUploadStream(upload *Upload, fileID interface{}, filename string, chunks
5460
filename: filename,
5561
filesColl: files,
5662
buffer: make([]byte, UploadBufferSize),
63+
ctx: ctx,
5764
}
5865
}
5966

@@ -63,49 +70,27 @@ func (us *UploadStream) Close() error {
6370
return ErrStreamClosed
6471
}
6572

66-
ctx, cancel := deadlineContext(us.writeDeadline)
67-
if cancel != nil {
68-
defer cancel()
69-
}
70-
7173
if us.bufferIndex != 0 {
72-
if err := us.uploadChunks(ctx, true); err != nil {
74+
if err := us.uploadChunks(us.ctx, true); err != nil {
7375
return err
7476
}
7577
}
7678

77-
if err := us.createFilesCollDoc(ctx); err != nil {
79+
if err := us.createFilesCollDoc(us.ctx); err != nil {
7880
return err
7981
}
8082

8183
us.closed = true
8284
return nil
8385
}
8486

85-
// SetWriteDeadline sets the write deadline for this stream.
86-
func (us *UploadStream) SetWriteDeadline(t time.Time) error {
87-
if us.closed {
88-
return ErrStreamClosed
89-
}
90-
91-
us.writeDeadline = t
92-
return nil
93-
}
94-
9587
// Write transfers the contents of a byte slice into this upload stream. If the stream's underlying buffer fills up,
9688
// the buffer will be uploaded as chunks to the server. Implements the io.Writer interface.
9789
func (us *UploadStream) Write(p []byte) (int, error) {
9890
if us.closed {
9991
return 0, ErrStreamClosed
10092
}
10193

102-
var ctx context.Context
103-
104-
ctx, cancel := deadlineContext(us.writeDeadline)
105-
if cancel != nil {
106-
defer cancel()
107-
}
108-
10994
origLen := len(p)
11095
for {
11196
if len(p) == 0 {
@@ -117,7 +102,7 @@ func (us *UploadStream) Write(p []byte) (int, error) {
117102
us.bufferIndex += n
118103

119104
if us.bufferIndex == UploadBufferSize {
120-
err := us.uploadChunks(ctx, false)
105+
err := us.uploadChunks(us.ctx, false)
121106
if err != nil {
122107
return 0, err
123108
}
@@ -132,12 +117,7 @@ func (us *UploadStream) Abort() error {
132117
return ErrStreamClosed
133118
}
134119

135-
ctx, cancel := deadlineContext(us.writeDeadline)
136-
if cancel != nil {
137-
defer cancel()
138-
}
139-
140-
_, err := us.chunksColl.DeleteMany(ctx, bson.D{{"files_id", us.FileID}})
120+
_, err := us.chunksColl.DeleteMany(us.ctx, bson.D{{"files_id", us.FileID}})
141121
if err != nil {
142122
return err
143123
}

mongo/integration/crud_helpers_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,7 +1233,7 @@ func executeGridFSDownload(mt *mtest.T, bucket *gridfs.Bucket, args bson.Raw) (i
12331233
}
12341234
}
12351235

1236-
return bucket.DownloadToStream(fileID, new(bytes.Buffer))
1236+
return bucket.DownloadToStream(context.Background(), fileID, new(bytes.Buffer))
12371237
}
12381238

12391239
func executeGridFSDownloadByName(mt *mtest.T, bucket *gridfs.Bucket, args bson.Raw) (int64, error) {
@@ -1253,7 +1253,7 @@ func executeGridFSDownloadByName(mt *mtest.T, bucket *gridfs.Bucket, args bson.R
12531253
}
12541254
}
12551255

1256-
return bucket.DownloadToStreamByName(file, new(bytes.Buffer))
1256+
return bucket.DownloadToStreamByName(context.Background(), file, new(bytes.Buffer))
12571257
}
12581258

12591259
func executeCreateIndex(mt *mtest.T, sess mongo.Session, args bson.Raw) (string, error) {

0 commit comments

Comments
 (0)