diff --git a/.travis.yml b/.travis.yml index c1cc10aaf..b89d7e11b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,7 @@ go: - 1.5 - 1.6 - 1.7 + - 1.8 - tip before_script: diff --git a/README.md b/README.md index 645203850..f0990ae84 100644 --- a/README.md +++ b/README.md @@ -412,6 +412,11 @@ Version 1.0 of the driver recommended adding `&charset=utf8` (alias for `SET NAM See http://dev.mysql.com/doc/refman/5.7/en/charset-unicode.html for more details on MySQL's Unicode support. +## Context Support +Since go1.8, context is introduced to `database/sql` for better control on timeout and cancellation. +New interfaces such as `driver.QueryerContext`, `driver.ExecerContext` are introduced. See more details on [context support to database/sql package](https://golang.org/doc/go1.8#database_sql, "sql"). + +In Go-MySQL-Driver, we implemented these interfaces for structs `mysqlConn`, `mysqlStmt` and `mysqlTx`. ## Testing / Development To run the driver tests you may need to adjust the configuration. See the [Testing Wiki-Page](https://github.com/go-sql-driver/mysql/wiki/Testing "Testing") for details. diff --git a/connection.go b/connection.go index d82c728f3..816586048 100644 --- a/connection.go +++ b/connection.go @@ -1,3 +1,5 @@ +// +build !go1.8 + // Go MySQL Driver - A MySQL-Driver for Go's database/sql package // // Copyright 2012 The Go-MySQL-Driver Authors. All rights reserved. diff --git a/connection_ctx.go b/connection_ctx.go new file mode 100644 index 000000000..f92cf7bcb --- /dev/null +++ b/connection_ctx.go @@ -0,0 +1,418 @@ +// +build go1.8 + +// Go MySQL Driver - A MySQL-Driver for Go's database/sql package +// +// Copyright 2012 The Go-MySQL-Driver Authors. All rights reserved. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package mysql + +import ( + "context" + "database/sql/driver" + "net" + "strconv" + "strings" + "time" +) + +type mysqlConn struct { + buf buffer + netConn net.Conn + affectedRows uint64 + insertId uint64 + cfg *Config + maxAllowedPacket int + maxWriteSize int + writeTimeout time.Duration + flags clientFlag + status statusFlag + sequence uint8 + parseTime bool + strict bool +} + +// Handles parameters set in DSN after the connection is established +func (mc *mysqlConn) handleParams() (err error) { + for param, val := range mc.cfg.Params { + switch param { + // Charset + case "charset": + charsets := strings.Split(val, ",") + for i := range charsets { + // ignore errors here - a charset may not exist + err = mc.exec(context.Background(), "SET NAMES "+charsets[i]) + if err == nil { + break + } + } + if err != nil { + return + } + + // System Vars + default: + err = mc.exec(context.Background(), "SET "+param+"="+val+"") + if err != nil { + return + } + } + } + + return +} + +// Begin implements driver.Conn interface +func (mc *mysqlConn) Begin() (driver.Tx, error) { + return mc.BeginTx(context.Background(), driver.TxOptions{}) +} + +// Ping implements drvier.Pinger interface +func (mc *mysqlConn) Ping(ctx context.Context) error { + if mc.netConn == nil { + errLog.Print(ErrInvalidConn) + return driver.ErrBadConn + } + if err := mc.writeCommandPacket(ctx, comPing); err != nil { + errLog.Print(err) + return err + } + + if _, err := mc.readResultOK(); err != nil { + errLog.Print(err) + return err + } + return nil +} + +// BeginTx implements driver.ConnBeginTx interface +func (mc *mysqlConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) { + if mc.netConn == nil { + errLog.Print(ErrInvalidConn) + return nil, driver.ErrBadConn + } + err := mc.exec(ctx, "START TRANSACTION") + if err == nil { + return &mysqlTx{mc}, err + } + + return nil, err +} + +func (mc *mysqlConn) Close() (err error) { + // Makes Close idempotent + if mc.netConn != nil { + err = mc.writeCommandPacket(context.Background(), comQuit) + } + + mc.cleanup() + + return +} + +// Closes the network connection and unsets internal variables. Do not call this +// function after successfully authentication, call Close instead. This function +// is called before auth or on auth failure because MySQL will have already +// closed the network connection. +func (mc *mysqlConn) cleanup() { + // Makes cleanup idempotent + if mc.netConn != nil { + if err := mc.netConn.Close(); err != nil { + errLog.Print(err) + } + mc.netConn = nil + } + mc.cfg = nil + mc.buf.nc = nil +} + +func (mc *mysqlConn) Prepare(query string) (driver.Stmt, error) { + return mc.PrepareContext(context.Background(), query) +} + +func (mc *mysqlConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) { + if mc.netConn == nil { + errLog.Print(ErrInvalidConn) + return nil, driver.ErrBadConn + } + // Send command + err := mc.writeCommandPacketStr(ctx, comStmtPrepare, query) + if err != nil { + return nil, err + } + + stmt := &mysqlStmt{ + mc: mc, + } + + // Read Result + columnCount, err := stmt.readPrepareResultPacket() + if err == nil { + if stmt.paramCount > 0 { + if err = mc.readUntilEOF(); err != nil { + return nil, err + } + } + + if columnCount > 0 { + err = mc.readUntilEOF() + } + } + + return stmt, err +} + +func (mc *mysqlConn) interpolateParams(query string, args []driver.Value) (string, error) { + // Number of ? should be same to len(args) + if strings.Count(query, "?") != len(args) { + return "", driver.ErrSkip + } + + buf := mc.buf.takeCompleteBuffer() + if buf == nil { + // can not take the buffer. Something must be wrong with the connection + errLog.Print(ErrBusyBuffer) + return "", driver.ErrBadConn + } + buf = buf[:0] + argPos := 0 + + for i := 0; i < len(query); i++ { + q := strings.IndexByte(query[i:], '?') + if q == -1 { + buf = append(buf, query[i:]...) + break + } + buf = append(buf, query[i:i+q]...) + i += q + + arg := args[argPos] + argPos++ + + if arg == nil { + buf = append(buf, "NULL"...) + continue + } + + switch v := arg.(type) { + case int64: + buf = strconv.AppendInt(buf, v, 10) + case float64: + buf = strconv.AppendFloat(buf, v, 'g', -1, 64) + case bool: + if v { + buf = append(buf, '1') + } else { + buf = append(buf, '0') + } + case time.Time: + if v.IsZero() { + buf = append(buf, "'0000-00-00'"...) + } else { + v := v.In(mc.cfg.Loc) + v = v.Add(time.Nanosecond * 500) // To round under microsecond + year := v.Year() + year100 := year / 100 + year1 := year % 100 + month := v.Month() + day := v.Day() + hour := v.Hour() + minute := v.Minute() + second := v.Second() + micro := v.Nanosecond() / 1000 + + buf = append(buf, []byte{ + '\'', + digits10[year100], digits01[year100], + digits10[year1], digits01[year1], + '-', + digits10[month], digits01[month], + '-', + digits10[day], digits01[day], + ' ', + digits10[hour], digits01[hour], + ':', + digits10[minute], digits01[minute], + ':', + digits10[second], digits01[second], + }...) + + if micro != 0 { + micro10000 := micro / 10000 + micro100 := micro / 100 % 100 + micro1 := micro % 100 + buf = append(buf, []byte{ + '.', + digits10[micro10000], digits01[micro10000], + digits10[micro100], digits01[micro100], + digits10[micro1], digits01[micro1], + }...) + } + buf = append(buf, '\'') + } + case []byte: + if v == nil { + buf = append(buf, "NULL"...) + } else { + buf = append(buf, "_binary'"...) + if mc.status&statusNoBackslashEscapes == 0 { + buf = escapeBytesBackslash(buf, v) + } else { + buf = escapeBytesQuotes(buf, v) + } + buf = append(buf, '\'') + } + case string: + buf = append(buf, '\'') + if mc.status&statusNoBackslashEscapes == 0 { + buf = escapeStringBackslash(buf, v) + } else { + buf = escapeStringQuotes(buf, v) + } + buf = append(buf, '\'') + default: + return "", driver.ErrSkip + } + + if len(buf)+4 > mc.maxAllowedPacket { + return "", driver.ErrSkip + } + } + if argPos != len(args) { + return "", driver.ErrSkip + } + return string(buf), nil +} + +func (mc *mysqlConn) Exec(query string, args []driver.Value) (driver.Result, error) { + return mc.ExecContext(context.Background(), query, args) +} + +func (mc *mysqlConn) ExecContext(ctx context.Context, query string, args []driver.Value) (driver.Result, error) { + if mc.netConn == nil { + errLog.Print(ErrInvalidConn) + return nil, driver.ErrBadConn + } + if len(args) != 0 { + if !mc.cfg.InterpolateParams { + return nil, driver.ErrSkip + } + // try to interpolate the parameters to save extra roundtrips for preparing and closing a statement + prepared, err := mc.interpolateParams(query, args) + if err != nil { + return nil, err + } + query = prepared + args = nil + } + mc.affectedRows = 0 + mc.insertId = 0 + + err := mc.exec(ctx, query) + if err == nil { + return &mysqlResult{ + affectedRows: int64(mc.affectedRows), + insertId: int64(mc.insertId), + }, err + } + return nil, err +} + +// Internal function to execute commands +func (mc *mysqlConn) exec(ctx context.Context, query string) error { + // Send command + err := mc.writeCommandPacketStr(ctx, comQuery, query) + if err != nil { + return err + } + + // Read Result + resLen, err := mc.readResultSetHeaderPacket() + if err == nil && resLen > 0 { + if err = mc.readUntilEOF(); err != nil { + return err + } + + err = mc.readUntilEOF() + } + + return err +} + +// Query implements driver.Queryer interface +func (mc *mysqlConn) Query(query string, args []driver.Value) (driver.Rows, error) { + return mc.QueryContext(context.Background(), query, args) +} + +// QueryContext implements driver.QueryerContext interface +func (mc *mysqlConn) QueryContext(ctx context.Context, query string, args []driver.Value) (driver.Rows, error) { + if mc.netConn == nil { + errLog.Print(ErrInvalidConn) + return nil, driver.ErrBadConn + } + if len(args) != 0 { + if !mc.cfg.InterpolateParams { + return nil, driver.ErrSkip + } + // try client-side prepare to reduce roundtrip + prepared, err := mc.interpolateParams(query, args) + if err != nil { + return nil, err + } + query = prepared + args = nil + } + // Send command + err := mc.writeCommandPacketStr(ctx, comQuery, query) + if err == nil { + // Read Result + var resLen int + resLen, err = mc.readResultSetHeaderPacket() + if err == nil { + rows := new(textRows) + rows.mc = mc + + if resLen == 0 { + // no columns, no more data + return emptyRows{}, nil + } + // Columns + rows.columns, err = mc.readColumns(resLen) + return rows, err + } + } + return nil, err +} + +// Gets the value of the given MySQL System Variable +// The returned byte slice is only valid until the next read +func (mc *mysqlConn) getSystemVar(name string) ([]byte, error) { + // Send command + if err := mc.writeCommandPacketStr(context.Background(), comQuery, "SELECT @@"+name); err != nil { + return nil, err + } + + // Read Result + resLen, err := mc.readResultSetHeaderPacket() + if err == nil { + rows := new(textRows) + rows.mc = mc + rows.columns = []mysqlField{{fieldType: fieldTypeVarChar}} + + if resLen > 0 { + // Columns + if err := mc.readUntilEOF(); err != nil { + return nil, err + } + } + + dest := make([]driver.Value, resLen) + if err = rows.readRow(dest); err == nil { + return dest[0].([]byte), mc.readUntilEOF() + } + } + return nil, err +} diff --git a/driver.go b/driver.go index 0022d1f1e..d94b4f0f3 100644 --- a/driver.go +++ b/driver.go @@ -1,3 +1,5 @@ +// +build !go1.8 + // Copyright 2012 The Go-MySQL-Driver Authors. All rights reserved. // // This Source Code Form is subject to the terms of the Mozilla Public @@ -14,6 +16,7 @@ // db, err := sql.Open("mysql", "user:password@/dbname") // // See https://github.com/go-sql-driver/mysql#usage for details + package mysql import ( diff --git a/driver_ctx.go b/driver_ctx.go new file mode 100644 index 000000000..9a9aecd7a --- /dev/null +++ b/driver_ctx.go @@ -0,0 +1,187 @@ +// +build go1.8 + +// Copyright 2012 The Go-MySQL-Driver Authors. All rights reserved. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package mysql provides a MySQL driver for Go's database/sql package +// +// The driver should be used via the database/sql package: +// +// import "database/sql" +// import _ "github.com/go-sql-driver/mysql" +// +// db, err := sql.Open("mysql", "user:password@/dbname") +// +// See https://github.com/go-sql-driver/mysql#usage for details + +package mysql + +import ( + "context" + "database/sql" + "database/sql/driver" + "net" +) + +// MySQLDriver is exported to make the driver directly accessible. +// In general the driver is used via the database/sql package. +type MySQLDriver struct{} + +// DialFunc is a function which can be used to establish the network connection. +// Custom dial functions must be registered with RegisterDial +type DialFunc func(addr string) (net.Conn, error) + +var dials map[string]DialFunc + +// RegisterDial registers a custom dial function. It can then be used by the +// network address mynet(addr), where mynet is the registered new network. +// addr is passed as a parameter to the dial function. +func RegisterDial(net string, dial DialFunc) { + if dials == nil { + dials = make(map[string]DialFunc) + } + dials[net] = dial +} + +// Open new Connection. +// See https://github.com/go-sql-driver/mysql#dsn-data-source-name for how +// the DSN string is formated +func (d MySQLDriver) Open(dsn string) (driver.Conn, error) { + var err error + + // New mysqlConn + mc := &mysqlConn{ + maxAllowedPacket: maxPacketSize, + maxWriteSize: maxPacketSize - 1, + } + mc.cfg, err = ParseDSN(dsn) + if err != nil { + return nil, err + } + mc.parseTime = mc.cfg.ParseTime + mc.strict = mc.cfg.Strict + + // Connect to Server + if dial, ok := dials[mc.cfg.Net]; ok { + mc.netConn, err = dial(mc.cfg.Addr) + } else { + nd := net.Dialer{Timeout: mc.cfg.Timeout} + mc.netConn, err = nd.DialContext(context.Background(), mc.cfg.Net, mc.cfg.Addr) + } + if err != nil { + return nil, err + } + + // Enable TCP Keepalives on TCP connections + if tc, ok := mc.netConn.(*net.TCPConn); ok { + if err := tc.SetKeepAlive(true); err != nil { + // Don't send COM_QUIT before handshake. + mc.netConn.Close() + mc.netConn = nil + return nil, err + } + } + + mc.buf = newBuffer(mc.netConn) + + // Set I/O timeouts + mc.buf.timeout = mc.cfg.ReadTimeout + mc.writeTimeout = mc.cfg.WriteTimeout + + // Reading Handshake Initialization Packet + cipher, err := mc.readInitPacket() + if err != nil { + mc.cleanup() + return nil, err + } + + // Send Client Authentication Packet + if err = mc.writeAuthPacket(context.Background(), cipher); err != nil { + mc.cleanup() + return nil, err + } + + // Handle response to auth packet, switch methods if possible + if err = handleAuthResult(mc, cipher); err != nil { + // Authentication failed and MySQL has already closed the connection + // (https://dev.mysql.com/doc/internals/en/authentication-fails.html). + // Do not send COM_QUIT, just cleanup and return the error. + mc.cleanup() + return nil, err + } + + if mc.cfg.MaxAllowedPacket > 0 { + mc.maxAllowedPacket = mc.cfg.MaxAllowedPacket + } else { + // Get max allowed packet size + maxap, err := mc.getSystemVar("max_allowed_packet") + if err != nil { + mc.Close() + return nil, err + } + mc.maxAllowedPacket = stringToInt(maxap) - 1 + } + if mc.maxAllowedPacket < maxPacketSize { + mc.maxWriteSize = mc.maxAllowedPacket + } + + // Handle DSN Params + err = mc.handleParams() + if err != nil { + mc.Close() + return nil, err + } + + return mc, nil +} + +func handleAuthResult(mc *mysqlConn, oldCipher []byte) error { + // Read Result Packet + cipher, err := mc.readResultOK() + if err == nil { + return nil // auth successful + } + + if mc.cfg == nil { + return err // auth failed and retry not possible + } + + // Retry auth if configured to do so. + if mc.cfg.AllowOldPasswords && err == ErrOldPassword { + // Retry with old authentication method. Note: there are edge cases + // where this should work but doesn't; this is currently "wontfix": + // https://github.com/go-sql-driver/mysql/issues/184 + + // If CLIENT_PLUGIN_AUTH capability is not supported, no new cipher is + // sent and we have to keep using the cipher sent in the init packet. + if cipher == nil { + cipher = oldCipher + } + + if err = mc.writeOldAuthPacket(context.Background(), cipher); err != nil { + return err + } + _, err = mc.readResultOK() + } else if mc.cfg.AllowCleartextPasswords && err == ErrCleartextPassword { + // Retry with clear text password for + // http://dev.mysql.com/doc/refman/5.7/en/cleartext-authentication-plugin.html + // http://dev.mysql.com/doc/refman/5.7/en/pam-authentication-plugin.html + if err = mc.writeClearAuthPacket(context.Background()); err != nil { + return err + } + _, err = mc.readResultOK() + } else if mc.cfg.AllowNativePasswords && err == ErrNativePassword { + if err = mc.writeNativeAuthPacket(context.Background(), cipher); err != nil { + return err + } + _, err = mc.readResultOK() + } + return err +} + +func init() { + sql.Register("mysql", &MySQLDriver{}) +} diff --git a/driver_test.go b/driver_test.go index 78e68f5d0..d0772a443 100644 --- a/driver_test.go +++ b/driver_test.go @@ -182,6 +182,14 @@ func TestEmptyQuery(t *testing.T) { }) } +func (dbt *DBTest) TestPing(t *testing.T) { + runTests(t, dsn, func(dbt *DBTest) { + if err := dbt.db.Ping(); err != nil { + dbt.fail("Ping", "Ping", err) + } + }) +} + func TestCRUD(t *testing.T) { runTests(t, dsn, func(dbt *DBTest) { // Create Table diff --git a/infile.go b/infile.go index 547357cfa..60f06c13f 100644 --- a/infile.go +++ b/infile.go @@ -1,3 +1,5 @@ +// +build !go1.8 + // Go MySQL Driver - A MySQL-Driver for Go's database/sql package // // Copyright 2013 The Go-MySQL-Driver Authors. All rights reserved. diff --git a/infile_ctx.go b/infile_ctx.go new file mode 100644 index 000000000..bd7bb3abb --- /dev/null +++ b/infile_ctx.go @@ -0,0 +1,185 @@ +// +build go1.8 + +// Go MySQL Driver - A MySQL-Driver for Go's database/sql package +// +// Copyright 2013 The Go-MySQL-Driver Authors. All rights reserved. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package mysql + +import ( + "context" + "fmt" + "io" + "os" + "strings" + "sync" +) + +var ( + fileRegister map[string]bool + fileRegisterLock sync.RWMutex + readerRegister map[string]func() io.Reader + readerRegisterLock sync.RWMutex +) + +// RegisterLocalFile adds the given file to the file whitelist, +// so that it can be used by "LOAD DATA LOCAL INFILE ". +// Alternatively you can allow the use of all local files with +// the DSN parameter 'allowAllFiles=true' +// +// filePath := "/home/gopher/data.csv" +// mysql.RegisterLocalFile(filePath) +// err := db.Exec("LOAD DATA LOCAL INFILE '" + filePath + "' INTO TABLE foo") +// if err != nil { +// ... +// +func RegisterLocalFile(filePath string) { + fileRegisterLock.Lock() + // lazy map init + if fileRegister == nil { + fileRegister = make(map[string]bool) + } + + fileRegister[strings.Trim(filePath, `"`)] = true + fileRegisterLock.Unlock() +} + +// DeregisterLocalFile removes the given filepath from the whitelist. +func DeregisterLocalFile(filePath string) { + fileRegisterLock.Lock() + delete(fileRegister, strings.Trim(filePath, `"`)) + fileRegisterLock.Unlock() +} + +// RegisterReaderHandler registers a handler function which is used +// to receive a io.Reader. +// The Reader can be used by "LOAD DATA LOCAL INFILE Reader::". +// If the handler returns a io.ReadCloser Close() is called when the +// request is finished. +// +// mysql.RegisterReaderHandler("data", func() io.Reader { +// var csvReader io.Reader // Some Reader that returns CSV data +// ... // Open Reader here +// return csvReader +// }) +// err := db.Exec("LOAD DATA LOCAL INFILE 'Reader::data' INTO TABLE foo") +// if err != nil { +// ... +// +func RegisterReaderHandler(name string, handler func() io.Reader) { + readerRegisterLock.Lock() + // lazy map init + if readerRegister == nil { + readerRegister = make(map[string]func() io.Reader) + } + + readerRegister[name] = handler + readerRegisterLock.Unlock() +} + +// DeregisterReaderHandler removes the ReaderHandler function with +// the given name from the registry. +func DeregisterReaderHandler(name string) { + readerRegisterLock.Lock() + delete(readerRegister, name) + readerRegisterLock.Unlock() +} + +func deferredClose(err *error, closer io.Closer) { + closeErr := closer.Close() + if *err == nil { + *err = closeErr + } +} + +func (mc *mysqlConn) handleInFileRequest(ctx context.Context, name string) (err error) { + var rdr io.Reader + var data []byte + packetSize := 16 * 1024 // 16KB is small enough for disk readahead and large enough for TCP + if mc.maxWriteSize < packetSize { + packetSize = mc.maxWriteSize + } + + if idx := strings.Index(name, "Reader::"); idx == 0 || (idx > 0 && name[idx-1] == '/') { // io.Reader + // The server might return an an absolute path. See issue #355. + name = name[idx+8:] + + readerRegisterLock.RLock() + handler, inMap := readerRegister[name] + readerRegisterLock.RUnlock() + + if inMap { + rdr = handler() + if rdr != nil { + if cl, ok := rdr.(io.Closer); ok { + defer deferredClose(&err, cl) + } + } else { + err = fmt.Errorf("Reader '%s' is ", name) + } + } else { + err = fmt.Errorf("Reader '%s' is not registered", name) + } + } else { // File + name = strings.Trim(name, `"`) + fileRegisterLock.RLock() + fr := fileRegister[name] + fileRegisterLock.RUnlock() + if mc.cfg.AllowAllFiles || fr { + var file *os.File + var fi os.FileInfo + + if file, err = os.Open(name); err == nil { + defer deferredClose(&err, file) + + // get file size + if fi, err = file.Stat(); err == nil { + rdr = file + if fileSize := int(fi.Size()); fileSize < packetSize { + packetSize = fileSize + } + } + } + } else { + err = fmt.Errorf("local file '%s' is not registered", name) + } + } + + // send content packets + if err == nil { + data := make([]byte, 4+packetSize) + var n int + for err == nil { + n, err = rdr.Read(data[4:]) + if n > 0 { + if ioErr := mc.writePacket(ctx, data[:4+n]); ioErr != nil { + return ioErr + } + } + } + if err == io.EOF { + err = nil + } + } + + // send empty packet (termination) + if data == nil { + data = make([]byte, 4) + } + if ioErr := mc.writePacket(ctx, data[:4]); ioErr != nil { + return ioErr + } + + // read OK packet + if err == nil { + _, err = mc.readResultOK() + return err + } + + mc.readPacket() + return err +} diff --git a/packets.go b/packets.go index aafe9793e..444fb9d68 100644 --- a/packets.go +++ b/packets.go @@ -1,3 +1,5 @@ +// +build !go1.8 + // Go MySQL Driver - A MySQL-Driver for Go's database/sql package // // Copyright 2012 The Go-MySQL-Driver Authors. All rights reserved. diff --git a/packets_ctx.go b/packets_ctx.go new file mode 100644 index 000000000..0bc637e72 --- /dev/null +++ b/packets_ctx.go @@ -0,0 +1,1306 @@ +// +build go1.8 + +// Go MySQL Driver - A MySQL-Driver for Go's database/sql package +// +// Copyright 2012 The Go-MySQL-Driver Authors. All rights reserved. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package mysql + +import ( + "bytes" + "context" + "crypto/tls" + "database/sql/driver" + "encoding/binary" + "errors" + "fmt" + "io" + "math" + "time" +) + +// Packets documentation: +// http://dev.mysql.com/doc/internals/en/client-server-protocol.html + +// Read packet to buffer 'data' +func (mc *mysqlConn) readPacket() ([]byte, error) { + var prevData []byte + for { + // read packet header + data, err := mc.buf.readNext(4) + if err != nil { + errLog.Print(err) + mc.Close() + return nil, driver.ErrBadConn + } + + // packet length [24 bit] + pktLen := int(uint32(data[0]) | uint32(data[1])<<8 | uint32(data[2])<<16) + + // check packet sync [8 bit] + if data[3] != mc.sequence { + if data[3] > mc.sequence { + return nil, ErrPktSyncMul + } + return nil, ErrPktSync + } + mc.sequence++ + + // packets with length 0 terminate a previous packet which is a + // multiple of (2^24)−1 bytes long + if pktLen == 0 { + // there was no previous packet + if prevData == nil { + errLog.Print(ErrMalformPkt) + mc.Close() + return nil, driver.ErrBadConn + } + + return prevData, nil + } + + // read packet body [pktLen bytes] + data, err = mc.buf.readNext(pktLen) + if err != nil { + errLog.Print(err) + mc.Close() + return nil, driver.ErrBadConn + } + + // return data if this was the last packet + if pktLen < maxPacketSize { + // zero allocations for non-split packets + if prevData == nil { + return data, nil + } + + return append(prevData, data...), nil + } + + prevData = append(prevData, data...) + } +} + +// Write packet buffer 'data' +func (mc *mysqlConn) writePacket(ctx context.Context, data []byte) error { + if ctx == nil { + panic("context cannot be nil") + } + ctxDeadline, isCtxDeadlineSet := ctx.Deadline() + if isCtxDeadlineSet && !ctxDeadline.After(time.Now()) { + return errors.New("timeout") + } + + pktLen := len(data) - 4 + + if pktLen > mc.maxAllowedPacket { + return ErrPktTooLarge + } + + for { + var size int + if pktLen >= maxPacketSize { + data[0] = 0xff + data[1] = 0xff + data[2] = 0xff + size = maxPacketSize + } else { + data[0] = byte(pktLen) + data[1] = byte(pktLen >> 8) + data[2] = byte(pktLen >> 16) + size = pktLen + } + data[3] = mc.sequence + + // Write packet + var timeNow = time.Now() + var deadline = timeNow + if mc.writeTimeout > 0 { + deadline = timeNow.Add(mc.writeTimeout) + if isCtxDeadlineSet && deadline.After(ctxDeadline) { + deadline = ctxDeadline + } + } + if deadline.After(timeNow) { + if err := mc.netConn.SetWriteDeadline(deadline); err != nil { + return err + } + } + + n, err := mc.netConn.Write(data[:4+size]) + if err == nil && n == 4+size { + mc.sequence++ + if size != maxPacketSize { + return nil + } + pktLen -= size + data = data[size:] + continue + } + + // Handle error + if err == nil { // n != len(data) + errLog.Print(ErrMalformPkt) + } else { + errLog.Print(err) + } + return driver.ErrBadConn + } +} + +/****************************************************************************** +* Initialisation Process * +******************************************************************************/ + +// Handshake Initialization Packet +// http://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::Handshake +func (mc *mysqlConn) readInitPacket() ([]byte, error) { + data, err := mc.readPacket() + if err != nil { + return nil, err + } + + if data[0] == iERR { + return nil, mc.handleErrorPacket(data) + } + + // protocol version [1 byte] + if data[0] < minProtocolVersion { + return nil, fmt.Errorf( + "unsupported protocol version %d. Version %d or higher is required", + data[0], + minProtocolVersion, + ) + } + + // server version [null terminated string] + // connection id [4 bytes] + pos := 1 + bytes.IndexByte(data[1:], 0x00) + 1 + 4 + + // first part of the password cipher [8 bytes] + cipher := data[pos : pos+8] + + // (filler) always 0x00 [1 byte] + pos += 8 + 1 + + // capability flags (lower 2 bytes) [2 bytes] + mc.flags = clientFlag(binary.LittleEndian.Uint16(data[pos : pos+2])) + if mc.flags&clientProtocol41 == 0 { + return nil, ErrOldProtocol + } + if mc.flags&clientSSL == 0 && mc.cfg.tls != nil { + return nil, ErrNoTLS + } + pos += 2 + + if len(data) > pos { + // character set [1 byte] + // status flags [2 bytes] + // capability flags (upper 2 bytes) [2 bytes] + // length of auth-plugin-data [1 byte] + // reserved (all [00]) [10 bytes] + pos += 1 + 2 + 2 + 1 + 10 + + // second part of the password cipher [mininum 13 bytes], + // where len=MAX(13, length of auth-plugin-data - 8) + // + // The web documentation is ambiguous about the length. However, + // according to mysql-5.7/sql/auth/sql_authentication.cc line 538, + // the 13th byte is "\0 byte, terminating the second part of + // a scramble". So the second part of the password cipher is + // a NULL terminated string that's at least 13 bytes with the + // last byte being NULL. + // + // The official Python library uses the fixed length 12 + // which seems to work but technically could have a hidden bug. + cipher = append(cipher, data[pos:pos+12]...) + + // TODO: Verify string termination + // EOF if version (>= 5.5.7 and < 5.5.10) or (>= 5.6.0 and < 5.6.2) + // \NUL otherwise + // + //if data[len(data)-1] == 0 { + // return + //} + //return ErrMalformPkt + + // make a memory safe copy of the cipher slice + var b [20]byte + copy(b[:], cipher) + return b[:], nil + } + + // make a memory safe copy of the cipher slice + var b [8]byte + copy(b[:], cipher) + return b[:], nil +} + +// Client Authentication Packet +// http://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::HandshakeResponse +func (mc *mysqlConn) writeAuthPacket(ctx context.Context, cipher []byte) error { + // Adjust client flags based on server support + clientFlags := clientProtocol41 | + clientSecureConn | + clientLongPassword | + clientTransactions | + clientLocalFiles | + clientPluginAuth | + clientMultiResults | + mc.flags&clientLongFlag + + if mc.cfg.ClientFoundRows { + clientFlags |= clientFoundRows + } + + // To enable TLS / SSL + if mc.cfg.tls != nil { + clientFlags |= clientSSL + } + + if mc.cfg.MultiStatements { + clientFlags |= clientMultiStatements + } + + // User Password + scrambleBuff := scramblePassword(cipher, []byte(mc.cfg.Passwd)) + + pktLen := 4 + 4 + 1 + 23 + len(mc.cfg.User) + 1 + 1 + len(scrambleBuff) + 21 + 1 + + // To specify a db name + if n := len(mc.cfg.DBName); n > 0 { + clientFlags |= clientConnectWithDB + pktLen += n + 1 + } + + // Calculate packet length and get buffer with that size + data := mc.buf.takeSmallBuffer(pktLen + 4) + if data == nil { + // can not take the buffer. Something must be wrong with the connection + errLog.Print(ErrBusyBuffer) + return driver.ErrBadConn + } + + // ClientFlags [32 bit] + data[4] = byte(clientFlags) + data[5] = byte(clientFlags >> 8) + data[6] = byte(clientFlags >> 16) + data[7] = byte(clientFlags >> 24) + + // MaxPacketSize [32 bit] (none) + data[8] = 0x00 + data[9] = 0x00 + data[10] = 0x00 + data[11] = 0x00 + + // Charset [1 byte] + var found bool + data[12], found = collations[mc.cfg.Collation] + if !found { + // Note possibility for false negatives: + // could be triggered although the collation is valid if the + // collations map does not contain entries the server supports. + return errors.New("unknown collation") + } + + // SSL Connection Request Packet + // http://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::SSLRequest + if mc.cfg.tls != nil { + // Send TLS / SSL request packet + if err := mc.writePacket(ctx, data[:(4+4+1+23)+4]); err != nil { + return err + } + + // Switch to TLS + tlsConn := tls.Client(mc.netConn, mc.cfg.tls) + if err := tlsConn.Handshake(); err != nil { + return err + } + mc.netConn = tlsConn + mc.buf.nc = tlsConn + } + + // Filler [23 bytes] (all 0x00) + pos := 13 + for ; pos < 13+23; pos++ { + data[pos] = 0 + } + + // User [null terminated string] + if len(mc.cfg.User) > 0 { + pos += copy(data[pos:], mc.cfg.User) + } + data[pos] = 0x00 + pos++ + + // ScrambleBuffer [length encoded integer] + data[pos] = byte(len(scrambleBuff)) + pos += 1 + copy(data[pos+1:], scrambleBuff) + + // Databasename [null terminated string] + if len(mc.cfg.DBName) > 0 { + pos += copy(data[pos:], mc.cfg.DBName) + data[pos] = 0x00 + pos++ + } + + // Assume native client during response + pos += copy(data[pos:], "mysql_native_password") + data[pos] = 0x00 + + // Send Auth packet + return mc.writePacket(ctx, data) +} + +// Client old authentication packet +// http://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::AuthSwitchResponse +func (mc *mysqlConn) writeOldAuthPacket(ctx context.Context, cipher []byte) error { + // User password + scrambleBuff := scrambleOldPassword(cipher, []byte(mc.cfg.Passwd)) + + // Calculate the packet length and add a tailing 0 + pktLen := len(scrambleBuff) + 1 + data := mc.buf.takeSmallBuffer(4 + pktLen) + if data == nil { + // can not take the buffer. Something must be wrong with the connection + errLog.Print(ErrBusyBuffer) + return driver.ErrBadConn + } + + // Add the scrambled password [null terminated string] + copy(data[4:], scrambleBuff) + data[4+pktLen-1] = 0x00 + + return mc.writePacket(ctx, data) +} + +// Client clear text authentication packet +// http://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::AuthSwitchResponse +func (mc *mysqlConn) writeClearAuthPacket(ctx context.Context) error { + // Calculate the packet length and add a tailing 0 + pktLen := len(mc.cfg.Passwd) + 1 + data := mc.buf.takeSmallBuffer(4 + pktLen) + if data == nil { + // can not take the buffer. Something must be wrong with the connection + errLog.Print(ErrBusyBuffer) + return driver.ErrBadConn + } + + // Add the clear password [null terminated string] + copy(data[4:], mc.cfg.Passwd) + data[4+pktLen-1] = 0x00 + + return mc.writePacket(ctx, data) +} + +// Native password authentication method +// http://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::AuthSwitchResponse +func (mc *mysqlConn) writeNativeAuthPacket(ctx context.Context, cipher []byte) error { + scrambleBuff := scramblePassword(cipher, []byte(mc.cfg.Passwd)) + + // Calculate the packet length and add a tailing 0 + pktLen := len(scrambleBuff) + data := mc.buf.takeSmallBuffer(4 + pktLen) + if data == nil { + // can not take the buffer. Something must be wrong with the connection + errLog.Print(ErrBusyBuffer) + return driver.ErrBadConn + } + + // Add the scramble + copy(data[4:], scrambleBuff) + + return mc.writePacket(ctx, data) +} + +/****************************************************************************** +* Command Packets * +******************************************************************************/ + +func (mc *mysqlConn) writeCommandPacket(ctx context.Context, command byte) error { + // Reset Packet Sequence + mc.sequence = 0 + + data := mc.buf.takeSmallBuffer(4 + 1) + if data == nil { + // can not take the buffer. Something must be wrong with the connection + errLog.Print(ErrBusyBuffer) + return driver.ErrBadConn + } + + // Add command byte + data[4] = command + + // Send CMD packet + return mc.writePacket(ctx, data) +} + +func (mc *mysqlConn) writeCommandPacketStr(ctx context.Context, command byte, arg string) error { + // Reset Packet Sequence + mc.sequence = 0 + + pktLen := 1 + len(arg) + data := mc.buf.takeBuffer(pktLen + 4) + if data == nil { + // can not take the buffer. Something must be wrong with the connection + errLog.Print(ErrBusyBuffer) + return driver.ErrBadConn + } + + // Add command byte + data[4] = command + + // Add arg + copy(data[5:], arg) + + // Send CMD packet + return mc.writePacket(ctx, data) +} + +func (mc *mysqlConn) writeCommandPacketUint32(ctx context.Context, command byte, arg uint32) error { + // Reset Packet Sequence + mc.sequence = 0 + + data := mc.buf.takeSmallBuffer(4 + 1 + 4) + if data == nil { + // can not take the buffer. Something must be wrong with the connection + errLog.Print(ErrBusyBuffer) + return driver.ErrBadConn + } + + // Add command byte + data[4] = command + + // Add arg [32 bit] + data[5] = byte(arg) + data[6] = byte(arg >> 8) + data[7] = byte(arg >> 16) + data[8] = byte(arg >> 24) + + // Send CMD packet + return mc.writePacket(ctx, data) +} + +/****************************************************************************** +* Result Packets * +******************************************************************************/ + +// Returns error if Packet is not an 'Result OK'-Packet +func (mc *mysqlConn) readResultOK() ([]byte, error) { + data, err := mc.readPacket() + if err == nil { + // packet indicator + switch data[0] { + + case iOK: + return nil, mc.handleOkPacket(data) + + case iEOF: + if len(data) > 1 { + pluginEndIndex := bytes.IndexByte(data, 0x00) + plugin := string(data[1:pluginEndIndex]) + cipher := data[pluginEndIndex+1 : len(data)-1] + + if plugin == "mysql_old_password" { + // using old_passwords + return cipher, ErrOldPassword + } else if plugin == "mysql_clear_password" { + // using clear text password + return cipher, ErrCleartextPassword + } else if plugin == "mysql_native_password" { + // using mysql default authentication method + return cipher, ErrNativePassword + } else { + return cipher, ErrUnknownPlugin + } + } else { + // https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::OldAuthSwitchRequest + return nil, ErrOldPassword + } + + default: // Error otherwise + return nil, mc.handleErrorPacket(data) + } + } + return nil, err +} + +// Result Set Header Packet +// http://dev.mysql.com/doc/internals/en/com-query-response.html#packet-ProtocolText::Resultset +func (mc *mysqlConn) readResultSetHeaderPacket() (int, error) { + data, err := mc.readPacket() + if err == nil { + switch data[0] { + + case iOK: + return 0, mc.handleOkPacket(data) + + case iERR: + return 0, mc.handleErrorPacket(data) + + case iLocalInFile: + return 0, mc.handleInFileRequest(context.Background(), string(data[1:])) + } + + // column count + num, _, n := readLengthEncodedInteger(data) + if n-len(data) == 0 { + return int(num), nil + } + + return 0, ErrMalformPkt + } + return 0, err +} + +// Error Packet +// http://dev.mysql.com/doc/internals/en/generic-response-packets.html#packet-ERR_Packet +func (mc *mysqlConn) handleErrorPacket(data []byte) error { + if data[0] != iERR { + return ErrMalformPkt + } + + // 0xff [1 byte] + + // Error Number [16 bit uint] + errno := binary.LittleEndian.Uint16(data[1:3]) + + pos := 3 + + // SQL State [optional: # + 5bytes string] + if data[3] == 0x23 { + //sqlstate := string(data[4 : 4+5]) + pos = 9 + } + + // Error Message [string] + return &MySQLError{ + Number: errno, + Message: string(data[pos:]), + } +} + +func readStatus(b []byte) statusFlag { + return statusFlag(b[0]) | statusFlag(b[1])<<8 +} + +// Ok Packet +// http://dev.mysql.com/doc/internals/en/generic-response-packets.html#packet-OK_Packet +func (mc *mysqlConn) handleOkPacket(data []byte) error { + var n, m int + + // 0x00 [1 byte] + + // Affected rows [Length Coded Binary] + mc.affectedRows, _, n = readLengthEncodedInteger(data[1:]) + + // Insert id [Length Coded Binary] + mc.insertId, _, m = readLengthEncodedInteger(data[1+n:]) + + // server_status [2 bytes] + mc.status = readStatus(data[1+n+m : 1+n+m+2]) + if err := mc.discardResults(); err != nil { + return err + } + + // warning count [2 bytes] + if !mc.strict { + return nil + } + + pos := 1 + n + m + 2 + if binary.LittleEndian.Uint16(data[pos:pos+2]) > 0 { + return mc.getWarnings() + } + return nil +} + +// Read Packets as Field Packets until EOF-Packet or an Error appears +// http://dev.mysql.com/doc/internals/en/com-query-response.html#packet-Protocol::ColumnDefinition41 +func (mc *mysqlConn) readColumns(count int) ([]mysqlField, error) { + columns := make([]mysqlField, count) + + for i := 0; ; i++ { + data, err := mc.readPacket() + if err != nil { + return nil, err + } + + // EOF Packet + if data[0] == iEOF && (len(data) == 5 || len(data) == 1) { + if i == count { + return columns, nil + } + return nil, fmt.Errorf("column count mismatch n:%d len:%d", count, len(columns)) + } + + // Catalog + pos, err := skipLengthEncodedString(data) + if err != nil { + return nil, err + } + + // Database [len coded string] + n, err := skipLengthEncodedString(data[pos:]) + if err != nil { + return nil, err + } + pos += n + + // Table [len coded string] + if mc.cfg.ColumnsWithAlias { + tableName, _, n, err := readLengthEncodedString(data[pos:]) + if err != nil { + return nil, err + } + pos += n + columns[i].tableName = string(tableName) + } else { + n, err = skipLengthEncodedString(data[pos:]) + if err != nil { + return nil, err + } + pos += n + } + + // Original table [len coded string] + n, err = skipLengthEncodedString(data[pos:]) + if err != nil { + return nil, err + } + pos += n + + // Name [len coded string] + name, _, n, err := readLengthEncodedString(data[pos:]) + if err != nil { + return nil, err + } + columns[i].name = string(name) + pos += n + + // Original name [len coded string] + n, err = skipLengthEncodedString(data[pos:]) + if err != nil { + return nil, err + } + + // Filler [uint8] + // Charset [charset, collation uint8] + // Length [uint32] + pos += n + 1 + 2 + 4 + + // Field type [uint8] + columns[i].fieldType = data[pos] + pos++ + + // Flags [uint16] + columns[i].flags = fieldFlag(binary.LittleEndian.Uint16(data[pos : pos+2])) + pos += 2 + + // Decimals [uint8] + columns[i].decimals = data[pos] + //pos++ + + // Default value [len coded binary] + //if pos < len(data) { + // defaultVal, _, err = bytesToLengthCodedBinary(data[pos:]) + //} + } +} + +// Read Packets as Field Packets until EOF-Packet or an Error appears +// http://dev.mysql.com/doc/internals/en/com-query-response.html#packet-ProtocolText::ResultsetRow +func (rows *textRows) readRow(dest []driver.Value) error { + mc := rows.mc + + data, err := mc.readPacket() + if err != nil { + return err + } + + // EOF Packet + if data[0] == iEOF && len(data) == 5 { + // server_status [2 bytes] + rows.mc.status = readStatus(data[3:]) + err = rows.mc.discardResults() + if err == nil { + err = io.EOF + } else { + // connection unusable + rows.mc.Close() + } + rows.mc = nil + return err + } + if data[0] == iERR { + rows.mc = nil + return mc.handleErrorPacket(data) + } + + // RowSet Packet + var n int + var isNull bool + pos := 0 + + for i := range dest { + // Read bytes and convert to string + dest[i], isNull, n, err = readLengthEncodedString(data[pos:]) + pos += n + if err == nil { + if !isNull { + if !mc.parseTime { + continue + } else { + switch rows.columns[i].fieldType { + case fieldTypeTimestamp, fieldTypeDateTime, + fieldTypeDate, fieldTypeNewDate: + dest[i], err = parseDateTime( + string(dest[i].([]byte)), + mc.cfg.Loc, + ) + if err == nil { + continue + } + default: + continue + } + } + + } else { + dest[i] = nil + continue + } + } + return err // err != nil + } + + return nil +} + +// Reads Packets until EOF-Packet or an Error appears. Returns count of Packets read +func (mc *mysqlConn) readUntilEOF() error { + for { + data, err := mc.readPacket() + if err != nil { + return err + } + + switch data[0] { + case iERR: + return mc.handleErrorPacket(data) + case iEOF: + if len(data) == 5 { + mc.status = readStatus(data[3:]) + } + return nil + } + } +} + +/****************************************************************************** +* Prepared Statements * +******************************************************************************/ + +// Prepare Result Packets +// http://dev.mysql.com/doc/internals/en/com-stmt-prepare-response.html +func (stmt *mysqlStmt) readPrepareResultPacket() (uint16, error) { + data, err := stmt.mc.readPacket() + if err == nil { + // packet indicator [1 byte] + if data[0] != iOK { + return 0, stmt.mc.handleErrorPacket(data) + } + + // statement id [4 bytes] + stmt.id = binary.LittleEndian.Uint32(data[1:5]) + + // Column count [16 bit uint] + columnCount := binary.LittleEndian.Uint16(data[5:7]) + + // Param count [16 bit uint] + stmt.paramCount = int(binary.LittleEndian.Uint16(data[7:9])) + + // Reserved [8 bit] + + // Warning count [16 bit uint] + if !stmt.mc.strict { + return columnCount, nil + } + + // Check for warnings count > 0, only available in MySQL > 4.1 + if len(data) >= 12 && binary.LittleEndian.Uint16(data[10:12]) > 0 { + return columnCount, stmt.mc.getWarnings() + } + return columnCount, nil + } + return 0, err +} + +// http://dev.mysql.com/doc/internals/en/com-stmt-send-long-data.html +func (stmt *mysqlStmt) writeCommandLongData(ctx context.Context, paramID int, arg []byte) error { + maxLen := stmt.mc.maxAllowedPacket - 1 + pktLen := maxLen + + // After the header (bytes 0-3) follows before the data: + // 1 byte command + // 4 bytes stmtID + // 2 bytes paramID + const dataOffset = 1 + 4 + 2 + + // Can not use the write buffer since + // a) the buffer is too small + // b) it is in use + data := make([]byte, 4+1+4+2+len(arg)) + + copy(data[4+dataOffset:], arg) + + for argLen := len(arg); argLen > 0; argLen -= pktLen - dataOffset { + if dataOffset+argLen < maxLen { + pktLen = dataOffset + argLen + } + + stmt.mc.sequence = 0 + // Add command byte [1 byte] + data[4] = comStmtSendLongData + + // Add stmtID [32 bit] + data[5] = byte(stmt.id) + data[6] = byte(stmt.id >> 8) + data[7] = byte(stmt.id >> 16) + data[8] = byte(stmt.id >> 24) + + // Add paramID [16 bit] + data[9] = byte(paramID) + data[10] = byte(paramID >> 8) + + // Send CMD packet + err := stmt.mc.writePacket(ctx, data[:4+pktLen]) + if err == nil { + data = data[pktLen-dataOffset:] + continue + } + return err + + } + + // Reset Packet Sequence + stmt.mc.sequence = 0 + return nil +} + +// Execute Prepared Statement +// http://dev.mysql.com/doc/internals/en/com-stmt-execute.html +func (stmt *mysqlStmt) writeExecutePacket(ctx context.Context, args []driver.Value) error { + if len(args) != stmt.paramCount { + return fmt.Errorf( + "argument count mismatch (got: %d; has: %d)", + len(args), + stmt.paramCount, + ) + } + + const minPktLen = 4 + 1 + 4 + 1 + 4 + mc := stmt.mc + + // Reset packet-sequence + mc.sequence = 0 + + var data []byte + + if len(args) == 0 { + data = mc.buf.takeBuffer(minPktLen) + } else { + data = mc.buf.takeCompleteBuffer() + } + if data == nil { + // can not take the buffer. Something must be wrong with the connection + errLog.Print(ErrBusyBuffer) + return driver.ErrBadConn + } + + // command [1 byte] + data[4] = comStmtExecute + + // statement_id [4 bytes] + data[5] = byte(stmt.id) + data[6] = byte(stmt.id >> 8) + data[7] = byte(stmt.id >> 16) + data[8] = byte(stmt.id >> 24) + + // flags (0: CURSOR_TYPE_NO_CURSOR) [1 byte] + data[9] = 0x00 + + // iteration_count (uint32(1)) [4 bytes] + data[10] = 0x01 + data[11] = 0x00 + data[12] = 0x00 + data[13] = 0x00 + + if len(args) > 0 { + pos := minPktLen + + var nullMask []byte + if maskLen, typesLen := (len(args)+7)/8, 1+2*len(args); pos+maskLen+typesLen >= len(data) { + // buffer has to be extended but we don't know by how much so + // we depend on append after all data with known sizes fit. + // We stop at that because we deal with a lot of columns here + // which makes the required allocation size hard to guess. + tmp := make([]byte, pos+maskLen+typesLen) + copy(tmp[:pos], data[:pos]) + data = tmp + nullMask = data[pos : pos+maskLen] + pos += maskLen + } else { + nullMask = data[pos : pos+maskLen] + for i := 0; i < maskLen; i++ { + nullMask[i] = 0 + } + pos += maskLen + } + + // newParameterBoundFlag 1 [1 byte] + data[pos] = 0x01 + pos++ + + // type of each parameter [len(args)*2 bytes] + paramTypes := data[pos:] + pos += len(args) * 2 + + // value of each parameter [n bytes] + paramValues := data[pos:pos] + valuesCap := cap(paramValues) + + for i, arg := range args { + // build NULL-bitmap + if arg == nil { + nullMask[i/8] |= 1 << (uint(i) & 7) + paramTypes[i+i] = fieldTypeNULL + paramTypes[i+i+1] = 0x00 + continue + } + + // cache types and values + switch v := arg.(type) { + case int64: + paramTypes[i+i] = fieldTypeLongLong + paramTypes[i+i+1] = 0x00 + + if cap(paramValues)-len(paramValues)-8 >= 0 { + paramValues = paramValues[:len(paramValues)+8] + binary.LittleEndian.PutUint64( + paramValues[len(paramValues)-8:], + uint64(v), + ) + } else { + paramValues = append(paramValues, + uint64ToBytes(uint64(v))..., + ) + } + + case float64: + paramTypes[i+i] = fieldTypeDouble + paramTypes[i+i+1] = 0x00 + + if cap(paramValues)-len(paramValues)-8 >= 0 { + paramValues = paramValues[:len(paramValues)+8] + binary.LittleEndian.PutUint64( + paramValues[len(paramValues)-8:], + math.Float64bits(v), + ) + } else { + paramValues = append(paramValues, + uint64ToBytes(math.Float64bits(v))..., + ) + } + + case bool: + paramTypes[i+i] = fieldTypeTiny + paramTypes[i+i+1] = 0x00 + + if v { + paramValues = append(paramValues, 0x01) + } else { + paramValues = append(paramValues, 0x00) + } + + case []byte: + // Common case (non-nil value) first + if v != nil { + paramTypes[i+i] = fieldTypeString + paramTypes[i+i+1] = 0x00 + + if len(v) < mc.maxAllowedPacket-pos-len(paramValues)-(len(args)-(i+1))*64 { + paramValues = appendLengthEncodedInteger(paramValues, + uint64(len(v)), + ) + paramValues = append(paramValues, v...) + } else { + if err := stmt.writeCommandLongData(ctx, i, v); err != nil { + return err + } + } + continue + } + + // Handle []byte(nil) as a NULL value + nullMask[i/8] |= 1 << (uint(i) & 7) + paramTypes[i+i] = fieldTypeNULL + paramTypes[i+i+1] = 0x00 + + case string: + paramTypes[i+i] = fieldTypeString + paramTypes[i+i+1] = 0x00 + + if len(v) < mc.maxAllowedPacket-pos-len(paramValues)-(len(args)-(i+1))*64 { + paramValues = appendLengthEncodedInteger(paramValues, + uint64(len(v)), + ) + paramValues = append(paramValues, v...) + } else { + if err := stmt.writeCommandLongData(ctx, i, []byte(v)); err != nil { + return err + } + } + + case time.Time: + paramTypes[i+i] = fieldTypeString + paramTypes[i+i+1] = 0x00 + + var val []byte + if v.IsZero() { + val = []byte("0000-00-00") + } else { + val = []byte(v.In(mc.cfg.Loc).Format(timeFormat)) + } + + paramValues = appendLengthEncodedInteger(paramValues, + uint64(len(val)), + ) + paramValues = append(paramValues, val...) + + default: + return fmt.Errorf("can not convert type: %T", arg) + } + } + + // Check if param values exceeded the available buffer + // In that case we must build the data packet with the new values buffer + if valuesCap != cap(paramValues) { + data = append(data[:pos], paramValues...) + mc.buf.buf = data + } + + pos += len(paramValues) + data = data[:pos] + } + + return mc.writePacket(ctx, data) +} + +func (mc *mysqlConn) discardResults() error { + for mc.status&statusMoreResultsExists != 0 { + resLen, err := mc.readResultSetHeaderPacket() + if err != nil { + return err + } + if resLen > 0 { + // columns + if err := mc.readUntilEOF(); err != nil { + return err + } + // rows + if err := mc.readUntilEOF(); err != nil { + return err + } + } else { + mc.status &^= statusMoreResultsExists + } + } + return nil +} + +// http://dev.mysql.com/doc/internals/en/binary-protocol-resultset-row.html +func (rows *binaryRows) readRow(dest []driver.Value) error { + data, err := rows.mc.readPacket() + if err != nil { + return err + } + + // packet indicator [1 byte] + if data[0] != iOK { + // EOF Packet + if data[0] == iEOF && len(data) == 5 { + rows.mc.status = readStatus(data[3:]) + err = rows.mc.discardResults() + if err == nil { + err = io.EOF + } else { + // connection unusable + rows.mc.Close() + } + rows.mc = nil + return err + } + rows.mc = nil + + // Error otherwise + return rows.mc.handleErrorPacket(data) + } + + // NULL-bitmap, [(column-count + 7 + 2) / 8 bytes] + pos := 1 + (len(dest)+7+2)>>3 + nullMask := data[1:pos] + + for i := range dest { + // Field is NULL + // (byte >> bit-pos) % 2 == 1 + if ((nullMask[(i+2)>>3] >> uint((i+2)&7)) & 1) == 1 { + dest[i] = nil + continue + } + + // Convert to byte-coded string + switch rows.columns[i].fieldType { + case fieldTypeNULL: + dest[i] = nil + continue + + // Numeric Types + case fieldTypeTiny: + if rows.columns[i].flags&flagUnsigned != 0 { + dest[i] = int64(data[pos]) + } else { + dest[i] = int64(int8(data[pos])) + } + pos++ + continue + + case fieldTypeShort, fieldTypeYear: + if rows.columns[i].flags&flagUnsigned != 0 { + dest[i] = int64(binary.LittleEndian.Uint16(data[pos : pos+2])) + } else { + dest[i] = int64(int16(binary.LittleEndian.Uint16(data[pos : pos+2]))) + } + pos += 2 + continue + + case fieldTypeInt24, fieldTypeLong: + if rows.columns[i].flags&flagUnsigned != 0 { + dest[i] = int64(binary.LittleEndian.Uint32(data[pos : pos+4])) + } else { + dest[i] = int64(int32(binary.LittleEndian.Uint32(data[pos : pos+4]))) + } + pos += 4 + continue + + case fieldTypeLongLong: + if rows.columns[i].flags&flagUnsigned != 0 { + val := binary.LittleEndian.Uint64(data[pos : pos+8]) + if val > math.MaxInt64 { + dest[i] = uint64ToString(val) + } else { + dest[i] = int64(val) + } + } else { + dest[i] = int64(binary.LittleEndian.Uint64(data[pos : pos+8])) + } + pos += 8 + continue + + case fieldTypeFloat: + dest[i] = float32(math.Float32frombits(binary.LittleEndian.Uint32(data[pos : pos+4]))) + pos += 4 + continue + + case fieldTypeDouble: + dest[i] = math.Float64frombits(binary.LittleEndian.Uint64(data[pos : pos+8])) + pos += 8 + continue + + // Length coded Binary Strings + case fieldTypeDecimal, fieldTypeNewDecimal, fieldTypeVarChar, + fieldTypeBit, fieldTypeEnum, fieldTypeSet, fieldTypeTinyBLOB, + fieldTypeMediumBLOB, fieldTypeLongBLOB, fieldTypeBLOB, + fieldTypeVarString, fieldTypeString, fieldTypeGeometry, fieldTypeJSON: + var isNull bool + var n int + dest[i], isNull, n, err = readLengthEncodedString(data[pos:]) + pos += n + if err == nil { + if !isNull { + continue + } else { + dest[i] = nil + continue + } + } + return err + + case + fieldTypeDate, fieldTypeNewDate, // Date YYYY-MM-DD + fieldTypeTime, // Time [-][H]HH:MM:SS[.fractal] + fieldTypeTimestamp, fieldTypeDateTime: // Timestamp YYYY-MM-DD HH:MM:SS[.fractal] + + num, isNull, n := readLengthEncodedInteger(data[pos:]) + pos += n + + switch { + case isNull: + dest[i] = nil + continue + case rows.columns[i].fieldType == fieldTypeTime: + // database/sql does not support an equivalent to TIME, return a string + var dstlen uint8 + switch decimals := rows.columns[i].decimals; decimals { + case 0x00, 0x1f: + dstlen = 8 + case 1, 2, 3, 4, 5, 6: + dstlen = 8 + 1 + decimals + default: + return fmt.Errorf( + "protocol error, illegal decimals value %d", + rows.columns[i].decimals, + ) + } + dest[i], err = formatBinaryDateTime(data[pos:pos+int(num)], dstlen, true) + case rows.mc.parseTime: + dest[i], err = parseBinaryDateTime(num, data[pos:], rows.mc.cfg.Loc) + default: + var dstlen uint8 + if rows.columns[i].fieldType == fieldTypeDate { + dstlen = 10 + } else { + switch decimals := rows.columns[i].decimals; decimals { + case 0x00, 0x1f: + dstlen = 19 + case 1, 2, 3, 4, 5, 6: + dstlen = 19 + 1 + decimals + default: + return fmt.Errorf( + "protocol error, illegal decimals value %d", + rows.columns[i].decimals, + ) + } + } + dest[i], err = formatBinaryDateTime(data[pos:pos+int(num)], dstlen, false) + } + + if err == nil { + pos += int(num) + continue + } else { + return err + } + + // Please report if this happens! + default: + return fmt.Errorf("unknown field type %d", rows.columns[i].fieldType) + } + } + + return nil +} diff --git a/statement.go b/statement.go index 7f9b04585..60c708861 100644 --- a/statement.go +++ b/statement.go @@ -1,3 +1,5 @@ +// +build !go1.8 + // Go MySQL Driver - A MySQL-Driver for Go's database/sql package // // Copyright 2012 The Go-MySQL-Driver Authors. All rights reserved. diff --git a/statement_ctx.go b/statement_ctx.go new file mode 100644 index 000000000..3ed483899 --- /dev/null +++ b/statement_ctx.go @@ -0,0 +1,171 @@ +// +build go1.8 + +// Go MySQL Driver - A MySQL-Driver for Go's database/sql package +// +// Copyright 2012 The Go-MySQL-Driver Authors. All rights reserved. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package mysql + +import ( + "context" + "database/sql/driver" + "fmt" + "reflect" + "strconv" +) + +type mysqlStmt struct { + mc *mysqlConn + id uint32 + paramCount int + columns []mysqlField // cached from the first query +} + +// Close implements driver.Conn interface +func (stmt *mysqlStmt) Close() error { + if stmt.mc == nil || stmt.mc.netConn == nil { + // driver.Stmt.Close can be called more than once, thus this function + // has to be idempotent. + // See also Issue #450 and golang/go#16019. + //errLog.Print(ErrInvalidConn) + return driver.ErrBadConn + } + + err := stmt.mc.writeCommandPacketUint32(context.Background(), comStmtClose, stmt.id) + stmt.mc = nil + return err +} + +// NumInput implements driver.Stmt interface +func (stmt *mysqlStmt) NumInput() int { + return stmt.paramCount +} + +func (stmt *mysqlStmt) ColumnConverter(idx int) driver.ValueConverter { + return converter{} +} + +// Exec implements driver.Stmt interface +func (stmt *mysqlStmt) Exec(args []driver.Value) (driver.Result, error) { + return stmt.ExecContext(context.Background(), args) +} + +// ExecContent implements driver.StmtExecContext interface +func (stmt *mysqlStmt) ExecContext(ctx context.Context, args []driver.Value) (driver.Result, error) { + + if stmt.mc.netConn == nil { + errLog.Print(ErrInvalidConn) + return nil, driver.ErrBadConn + } + // Send command + err := stmt.writeExecutePacket(ctx, args) + if err != nil { + return nil, err + } + + mc := stmt.mc + + mc.affectedRows = 0 + mc.insertId = 0 + + // Read Result + resLen, err := mc.readResultSetHeaderPacket() + if err == nil { + if resLen > 0 { + // Columns + err = mc.readUntilEOF() + if err != nil { + return nil, err + } + + // Rows + err = mc.readUntilEOF() + } + if err == nil { + return &mysqlResult{ + affectedRows: int64(mc.affectedRows), + insertId: int64(mc.insertId), + }, nil + } + } + + return nil, err +} + +// Query implements driver.Stmt interface +func (stmt *mysqlStmt) Query(args []driver.Value) (driver.Rows, error) { + return stmt.QueryContext(context.Background(), args) +} + +// QueryContext implements driver.StmtQueryContext interface +func (stmt *mysqlStmt) QueryContext(ctx context.Context, args []driver.Value) (driver.Rows, error) { + if stmt.mc.netConn == nil { + errLog.Print(ErrInvalidConn) + return nil, driver.ErrBadConn + } + // Send command + err := stmt.writeExecutePacket(ctx, args) + if err != nil { + return nil, err + } + + mc := stmt.mc + + // Read Result + resLen, err := mc.readResultSetHeaderPacket() + if err != nil { + return nil, err + } + + rows := new(binaryRows) + + if resLen > 0 { + rows.mc = mc + // Columns + // If not cached, read them and cache them + if stmt.columns == nil { + rows.columns, err = mc.readColumns(resLen) + stmt.columns = rows.columns + } else { + rows.columns = stmt.columns + err = mc.readUntilEOF() + } + } + + return rows, err +} + +type converter struct{} + +func (c converter) ConvertValue(v interface{}) (driver.Value, error) { + if driver.IsValue(v) { + return v, nil + } + + rv := reflect.ValueOf(v) + switch rv.Kind() { + case reflect.Ptr: + // indirect pointers + if rv.IsNil() { + return nil, nil + } + return c.ConvertValue(rv.Elem().Interface()) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + return rv.Int(), nil + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32: + return int64(rv.Uint()), nil + case reflect.Uint64: + u64 := rv.Uint() + if u64 >= 1<<63 { + return strconv.FormatUint(u64, 10), nil + } + return int64(u64), nil + case reflect.Float32, reflect.Float64: + return rv.Float(), nil + } + return nil, fmt.Errorf("unsupported type %T, a %s", v, rv.Kind()) +} diff --git a/transaction.go b/transaction.go index 33c749b35..c47b52b45 100644 --- a/transaction.go +++ b/transaction.go @@ -1,3 +1,5 @@ +// +build !go1.8 + // Go MySQL Driver - A MySQL-Driver for Go's database/sql package // // Copyright 2012 The Go-MySQL-Driver Authors. All rights reserved. diff --git a/transaction_ctx.go b/transaction_ctx.go new file mode 100644 index 000000000..0f4d4faed --- /dev/null +++ b/transaction_ctx.go @@ -0,0 +1,37 @@ +// +build go1.8 + +// Go MySQL Driver - A MySQL-Driver for Go's database/sql package +// +// Copyright 2012 The Go-MySQL-Driver Authors. All rights reserved. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package mysql + +import "context" + +type mysqlTx struct { + mc *mysqlConn +} + +// Commit implements driver.Tx interface +func (tx *mysqlTx) Commit() (err error) { + if tx.mc == nil || tx.mc.netConn == nil { + return ErrInvalidConn + } + err = tx.mc.exec(context.Background(), "COMMIT") + tx.mc = nil + return +} + +// Rollback implements driver.Tx interface +func (tx *mysqlTx) Rollback() (err error) { + if tx.mc == nil || tx.mc.netConn == nil { + return ErrInvalidConn + } + err = tx.mc.exec(context.Background(), "ROLLBACK") + tx.mc = nil + return +}