diff --git a/CHANGELOG.md b/CHANGELOG.md index 24e7c5f65..6f021dd84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,11 +18,17 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ### Changed +- queue module version bumped to 1.3.0 (#278) + ### Fixed - Several non-critical data race issues (#218) - ConnectionPool does not properly handle disconnection with Opts.Reconnect set (#272) +- Connect() panics on concurrent schema update (#278) +- Wrong Ttr setup by Queue.Cfg() (#278) +- Flaky queue/Example_connectionPool (#278) +- Flaky queue/Example_simpleQueueCustomMsgPack (#277) ## [1.10.0] - 2022-12-31 diff --git a/Makefile b/Makefile index 2f4dd8d93..cc689dae7 100644 --- a/Makefile +++ b/Makefile @@ -26,8 +26,8 @@ clean: .PHONY: deps deps: clean - ( cd ./queue/testdata; $(TTCTL) rocks install queue 1.2.1 ) - ( cd ./crud/testdata; $(TTCTL) rocks install crud 1.0.0 ) + ( cd ./queue/testdata; $(TTCTL) rocks install queue 1.3.0 ) + ( cd ./crud/testdata; $(TTCTL) rocks install crud 1.1.1 ) .PHONY: datetime-timezones datetime-timezones: diff --git a/config.lua b/config.lua index eadfb3825..aafb98ceb 100644 --- a/config.lua +++ b/config.lua @@ -181,6 +181,23 @@ local function push_func(cnt) end rawset(_G, 'push_func', push_func) +local function create_spaces() + for i=1,10 do + local s = box.schema.space.create('test' .. tostring(i), { + id = 700 + i, + if_not_exists = true, + }) + local idx = s:create_index('test' .. tostring(i) .. 'primary', { + type = 'tree', + parts = {1, 'uint'}, + if_not_exists = true + }) + idx:drop() + s:drop() + end +end +rawset(_G, 'create_spaces', create_spaces) + local function tarantool_version_at_least(wanted_major, wanted_minor, wanted_patch) -- https://github.com/tarantool/crud/blob/733528be02c1ffa3dacc12c034ee58c9903127fc/test/helper.lua#L316-L337 local major_minor_patch = _TARANTOOL:split('-', 1)[1] diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index 62e55ea3c..bb1391950 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -33,7 +33,7 @@ var servers = []string{ } var connOpts = tarantool.Opts{ - Timeout: 500 * time.Millisecond, + Timeout: 5 * time.Second, User: "test", Pass: "test", } diff --git a/crud/example_test.go b/crud/example_test.go index 3f2ebbf88..2b80212ca 100644 --- a/crud/example_test.go +++ b/crud/example_test.go @@ -15,7 +15,7 @@ const ( ) var exampleOpts = tarantool.Opts{ - Timeout: 500 * time.Millisecond, + Timeout: 5 * time.Second, User: "test", Pass: "test", } diff --git a/crud/tarantool_test.go b/crud/tarantool_test.go index 71f7d62b1..6104e3621 100644 --- a/crud/tarantool_test.go +++ b/crud/tarantool_test.go @@ -20,7 +20,7 @@ var invalidSpaceName = "invalid" var indexNo = uint32(0) var indexName = "primary_index" var opts = tarantool.Opts{ - Timeout: 500 * time.Millisecond, + Timeout: 5 * time.Second, User: "test", Pass: "test", } @@ -104,6 +104,33 @@ var object = crud.MapObject{ "name": "bla", } +func connect(t testing.TB) *tarantool.Connection { + for i := 0; i < 10; i++ { + conn, err := tarantool.Connect(server, opts) + if err != nil { + t.Fatalf("Failed to connect: %s", err) + } + + ret := struct { + _msgpack struct{} `msgpack:",asArray"` //nolint: structcheck,unused + Result bool + }{} + err = conn.Do(tarantool.NewCall17Request("is_ready")).GetTyped(&ret) + if err != nil { + t.Fatalf("Failed to check is_ready: %s", err) + } + + if ret.Result { + return conn + } + + time.Sleep(time.Second) + } + + t.Fatalf("Failed to wait for a ready state connect.") + return nil +} + var testProcessDataCases = []struct { name string expectedRespLen int @@ -454,7 +481,7 @@ func testCrudRequestCheck(t *testing.T, req tarantool.Request, } func TestCrudGenerateData(t *testing.T) { - conn := test_helpers.ConnectWithValidation(t, server, opts) + conn := connect(t) defer conn.Close() for _, testCase := range testGenerateDataCases { @@ -477,7 +504,7 @@ func TestCrudGenerateData(t *testing.T) { } func TestCrudProcessData(t *testing.T) { - conn := test_helpers.ConnectWithValidation(t, server, opts) + conn := connect(t) defer conn.Close() for _, testCase := range testProcessDataCases { @@ -527,7 +554,7 @@ func TestUnflattenRows(t *testing.T) { tpls []interface{} ) - conn := test_helpers.ConnectWithValidation(t, server, opts) + conn := connect(t) defer conn.Close() // Do `replace`. @@ -586,7 +613,7 @@ func TestUnflattenRows(t *testing.T) { } func TestResultWithErr(t *testing.T) { - conn := test_helpers.ConnectWithValidation(t, server, opts) + conn := connect(t) defer conn.Close() for _, testCase := range testResultWithErrCases { @@ -601,7 +628,7 @@ func TestResultWithErr(t *testing.T) { } func TestBoolResult(t *testing.T) { - conn := test_helpers.ConnectWithValidation(t, server, opts) + conn := connect(t) defer conn.Close() req := crud.MakeTruncateRequest(spaceName).Opts(baseOpts) @@ -624,7 +651,7 @@ func TestBoolResult(t *testing.T) { } func TestNumberResult(t *testing.T) { - conn := test_helpers.ConnectWithValidation(t, server, opts) + conn := connect(t) defer conn.Close() req := crud.MakeCountRequest(spaceName).Opts(countOpts) @@ -665,7 +692,7 @@ func TestBaseResult(t *testing.T) { }, } - conn := test_helpers.ConnectWithValidation(t, server, opts) + conn := connect(t) defer conn.Close() req := crud.MakeSelectRequest(spaceName).Opts(selectOpts) @@ -708,7 +735,7 @@ func TestManyResult(t *testing.T) { }, } - conn := test_helpers.ConnectWithValidation(t, server, opts) + conn := connect(t) defer conn.Close() req := crud.MakeReplaceManyRequest(spaceName).Tuples(tuples).Opts(opManyOpts) @@ -733,7 +760,7 @@ func TestManyResult(t *testing.T) { } func TestStorageInfoResult(t *testing.T) { - conn := test_helpers.ConnectWithValidation(t, server, opts) + conn := connect(t) defer conn.Close() req := crud.MakeStorageInfoRequest().Opts(baseOpts) diff --git a/crud/testdata/config.lua b/crud/testdata/config.lua index 9f8b2d5db..4f4db077f 100644 --- a/crud/testdata/config.lua +++ b/crud/testdata/config.lua @@ -59,6 +59,16 @@ s:create_index('bucket_id', { unique = false, }) +local function is_ready_false() + return false +end + +local function is_ready_true() + return true +end + +rawset(_G, 'is_ready', is_ready_false) + -- Setup vshard. _G.vshard = vshard box.once('guest', function() @@ -93,7 +103,5 @@ box.schema.user.grant('test', 'execute', 'universe', nil, { if_not_exists = true box.schema.user.grant('test', 'create,read,write,drop,alter', 'space', nil, { if_not_exists = true }) box.schema.user.grant('test', 'create', 'sequence', nil, { if_not_exists = true }) --- Set listen only when every other thing is configured. -box.cfg{ - listen = os.getenv("TEST_TNT_LISTEN"), -} +-- Set is_ready = is_ready_true only when every other thing is configured. +rawset(_G, 'is_ready', is_ready_true) diff --git a/datetime/datetime_test.go b/datetime/datetime_test.go index f4cc8b2a1..4398c7f88 100644 --- a/datetime/datetime_test.go +++ b/datetime/datetime_test.go @@ -37,7 +37,7 @@ var isDatetimeSupported = false var server = "127.0.0.1:3013" var opts = Opts{ - Timeout: 500 * time.Millisecond, + Timeout: 5 * time.Second, User: "test", Pass: "test", } diff --git a/decimal/example_test.go b/decimal/example_test.go index 1d335a4c3..346419125 100644 --- a/decimal/example_test.go +++ b/decimal/example_test.go @@ -22,7 +22,7 @@ import ( func Example() { server := "127.0.0.1:3013" opts := tarantool.Opts{ - Timeout: 500 * time.Millisecond, + Timeout: 5 * time.Second, Reconnect: 1 * time.Second, MaxReconnects: 3, User: "test", diff --git a/example_test.go b/example_test.go index 7b0ee9a6a..7d16893e1 100644 --- a/example_test.go +++ b/example_test.go @@ -799,7 +799,7 @@ func ExampleConnection_Eval() { func ExampleConnect() { conn, err := tarantool.Connect("127.0.0.1:3013", tarantool.Opts{ - Timeout: 500 * time.Millisecond, + Timeout: 5 * time.Second, User: "test", Pass: "test", Concurrency: 32, @@ -895,11 +895,9 @@ func ExampleConnection_Execute() { } server := "127.0.0.1:3013" opts := tarantool.Opts{ - Timeout: 500 * time.Millisecond, - Reconnect: 1 * time.Second, - MaxReconnects: 3, - User: "test", - Pass: "test", + Timeout: 5 * time.Second, + User: "test", + Pass: "test", } client, err := tarantool.Connect(server, opts) if err != nil { @@ -1015,11 +1013,9 @@ func ExampleConnection_NewPrepared() { server := "127.0.0.1:3013" opts := tarantool.Opts{ - Timeout: 500 * time.Millisecond, - Reconnect: 1 * time.Second, - MaxReconnects: 3, - User: "test", - Pass: "test", + Timeout: 5 * time.Second, + User: "test", + Pass: "test", } conn, err := tarantool.Connect(server, opts) if err != nil { @@ -1057,8 +1053,8 @@ func ExampleConnection_NewWatcher() { server := "127.0.0.1:3013" opts := tarantool.Opts{ - Timeout: 500 * time.Millisecond, - Reconnect: 1 * time.Second, + Timeout: 5 * time.Second, + Reconnect: 5 * time.Second, MaxReconnects: 3, User: "test", Pass: "test", diff --git a/multi/example_test.go b/multi/example_test.go index e43461141..1ef369e4c 100644 --- a/multi/example_test.go +++ b/multi/example_test.go @@ -9,7 +9,7 @@ import ( func ExampleConnect() { multiConn, err := Connect([]string{"127.0.0.1:3031", "127.0.0.1:3032"}, tarantool.Opts{ - Timeout: 500 * time.Millisecond, + Timeout: 5 * time.Second, User: "test", Pass: "test", }) @@ -21,7 +21,7 @@ func ExampleConnect() { func ExampleConnectWithOpts() { multiConn, err := ConnectWithOpts([]string{"127.0.0.1:3301", "127.0.0.1:3302"}, tarantool.Opts{ - Timeout: 500 * time.Millisecond, + Timeout: 5 * time.Second, User: "test", Pass: "test", }, OptsMulti{ diff --git a/multi/multi_test.go b/multi/multi_test.go index fc165d1d4..2dff7a0d1 100644 --- a/multi/multi_test.go +++ b/multi/multi_test.go @@ -20,7 +20,7 @@ var spaceNo = uint32(617) var spaceName = "test" var indexNo = uint32(0) var connOpts = tarantool.Opts{ - Timeout: 500 * time.Millisecond, + Timeout: 5 * time.Second, User: "test", Pass: "test", } diff --git a/queue/example_connection_pool_test.go b/queue/example_connection_pool_test.go index e41cdc639..7e80110fd 100644 --- a/queue/example_connection_pool_test.go +++ b/queue/example_connection_pool_test.go @@ -18,12 +18,12 @@ type QueueConnectionHandler struct { name string cfg queue.Cfg - uuid uuid.UUID - registered bool - err error - mutex sync.Mutex - masterUpdated chan struct{} - masterCnt int32 + uuid uuid.UUID + registered bool + err error + mutex sync.Mutex + updated chan struct{} + masterCnt int32 } // QueueConnectionHandler implements the ConnectionHandler interface. @@ -32,9 +32,9 @@ var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{} // NewQueueConnectionHandler creates a QueueConnectionHandler object. func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler { return &QueueConnectionHandler{ - name: name, - cfg: cfg, - masterUpdated: make(chan struct{}, 10), + name: name, + cfg: cfg, + updated: make(chan struct{}, 10), } } @@ -53,14 +53,30 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection, } master := role == connection_pool.MasterRole - if master { - defer func() { - h.masterUpdated <- struct{}{} - }() - } - // Set up a queue module configuration for an instance. q := queue.New(conn, h.name) + + // Check is queue ready to work. + if state, err := q.State(); err != nil { + h.updated <- struct{}{} + h.err = err + return err + } else if master && state != queue.RunningState { + return fmt.Errorf("queue state is not RUNNING: %d", state) + } else if !master && state != queue.InitState && state != queue.WaitingState { + return fmt.Errorf("queue state is not INIT and not WAITING: %d", state) + } + + defer func() { + h.updated <- struct{}{} + }() + + // Set up a queue module configuration for an instance. Ideally, this + // should be done before box.cfg({}) or you need to wait some time + // before start a work. + // + // See: + // https://github.com/tarantool/queue/issues/206 opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second} if h.err = q.Cfg(opts); h.err != nil { @@ -72,10 +88,6 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection, return nil } - if h.err = q.Create(h.cfg); h.err != nil { - return h.err - } - if !h.registered { // We register a shared session at the first time. if h.uuid, h.err = q.Identify(nil); h.err != nil { @@ -89,6 +101,10 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection, } } + if h.err = q.Create(h.cfg); h.err != nil { + return h.err + } + fmt.Printf("Master %s is ready to work!\n", conn.Addr()) atomic.AddInt32(&h.masterCnt, 1) @@ -106,7 +122,7 @@ func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection, // Closes closes a QueueConnectionHandler object. func (h *QueueConnectionHandler) Close() { - close(h.masterUpdated) + close(h.updated) } // Example demonstrates how to use the queue package with the connection_pool @@ -140,12 +156,12 @@ func Example_connectionPool() { "127.0.0.1:3015", } connOpts := tarantool.Opts{ - Timeout: 1 * time.Second, + Timeout: 5 * time.Second, User: "test", Pass: "test", } poolOpts := connection_pool.OptsPool{ - CheckTimeout: 1 * time.Second, + CheckTimeout: 5 * time.Second, ConnectionHandler: h, } connPool, err := connection_pool.ConnectWithOpts(servers, connOpts, poolOpts) @@ -155,8 +171,10 @@ func Example_connectionPool() { } defer connPool.Close() - // Wait for a master instance identification in the queue. - <-h.masterUpdated + // Wait for a queue initialization and master instance identification in + // the queue. + <-h.updated + <-h.updated if h.err != nil { fmt.Printf("Unable to identify in the pool: %s", h.err) return @@ -177,14 +195,17 @@ func Example_connectionPool() { // Switch a master instance in the pool. roles := []bool{true, false} - err = test_helpers.SetClusterRO(servers, connOpts, roles) - if err != nil { - fmt.Printf("Unable to set cluster roles: %s", err) - return + for { + err := test_helpers.SetClusterRO(servers, connOpts, roles) + if err == nil { + break + } } - // Wait for a new master instance re-identification. - <-h.masterUpdated + // Wait for a replica instance connection and a new master instance + // re-identification. + <-h.updated + <-h.updated h.mutex.Lock() err = h.err h.mutex.Unlock() @@ -204,17 +225,24 @@ func Example_connectionPool() { time.Sleep(poolOpts.CheckTimeout) } - // Take a data from the new master instance. - task, err := q.Take() - if err != nil { - fmt.Println("Unable to got task:", err) - } else if task == nil { - fmt.Println("task == nil") - } else if task.Data() == nil { - fmt.Println("task.Data() == nil") - } else { - task.Ack() - fmt.Println("Got data:", task.Data()) + for { + // Take a data from the new master instance. + task, err := q.Take() + + if err == connection_pool.ErrNoRwInstance { + // It may be not registered yet by the pool. + continue + } else if err != nil { + fmt.Println("Unable to got task:", err) + } else if task == nil { + fmt.Println("task == nil") + } else if task.Data() == nil { + fmt.Println("task.Data() == nil") + } else { + task.Ack() + fmt.Println("Got data:", task.Data()) + } + break } // Output: diff --git a/queue/example_msgpack_test.go b/queue/example_msgpack_test.go index 2ed7f5542..5b179f5c0 100644 --- a/queue/example_msgpack_test.go +++ b/queue/example_msgpack_test.go @@ -48,7 +48,7 @@ func (c *dummyData) EncodeMsgpack(e *encoder) error { func Example_simpleQueueCustomMsgPack() { opts := tarantool.Opts{ Reconnect: time.Second, - Timeout: 2500 * time.Millisecond, + Timeout: 5 * time.Second, MaxReconnects: 5, User: "test", Pass: "test", @@ -65,9 +65,9 @@ func Example_simpleQueueCustomMsgPack() { IfNotExists: true, Kind: queue.FIFO, Opts: queue.Opts{ - Ttl: 10 * time.Second, - Ttr: 5 * time.Second, - Delay: 3 * time.Second, + Ttl: 20 * time.Second, + Ttr: 10 * time.Second, + Delay: 6 * time.Second, Pri: 1, }, } diff --git a/queue/queue.go b/queue/queue.go index df13f09bb..8d161b033 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -143,7 +143,7 @@ func (opts CfgOpts) toMap() map[string]interface{} { ret := make(map[string]interface{}) ret["in_replicaset"] = opts.InReplicaset if opts.Ttr != 0 { - ret["ttr"] = opts.Ttr + ret["ttr"] = opts.Ttr.Seconds() } return ret } diff --git a/queue/queue_test.go b/queue/queue_test.go index 0a3d4e919..f159a6334 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -22,7 +22,7 @@ var serversPool = []string{ var instances []test_helpers.TarantoolInstance var opts = Opts{ - Timeout: 2500 * time.Millisecond, + Timeout: 5 * time.Second, User: "test", Pass: "test", //Concurrency: 32, @@ -938,13 +938,22 @@ func runTestMain(m *testing.M) int { defer test_helpers.StopTarantoolInstances(instances) - roles := []bool{false, true} - connOpts := Opts{ - Timeout: 500 * time.Millisecond, - User: "test", - Pass: "test", + for i := 0; i < 10; i++ { + // We need to skip bootstrap errors and to make sure that cluster is + // configured. + roles := []bool{false, true} + connOpts := Opts{ + Timeout: 500 * time.Millisecond, + User: "test", + Pass: "test", + } + + err = test_helpers.SetClusterRO(serversPool, connOpts, roles) + if err == nil { + break + } + time.Sleep(time.Second) } - err = test_helpers.SetClusterRO(serversPool, connOpts, roles) if err != nil { log.Fatalf("Failed to set roles in tarantool pool: %s", err) diff --git a/queue/testdata/pool.lua b/queue/testdata/pool.lua index 1bcc11654..9ca98bbf1 100644 --- a/queue/testdata/pool.lua +++ b/queue/testdata/pool.lua @@ -25,6 +25,9 @@ end local queue = require('queue') rawset(_G, 'queue', queue) +-- queue.cfg({in_replicaset = true}) should be called before box.cfg({}) +-- https://github.com/tarantool/queue/issues/206 +queue.cfg({in_replicaset = true, ttr = 60}) local listen = os.getenv("TEST_TNT_LISTEN") box.cfg{ diff --git a/schema.go b/schema.go index a182e8b10..87c788fe0 100644 --- a/schema.go +++ b/schema.go @@ -325,8 +325,13 @@ func (conn *Connection) loadSchema() (err error) { return err } for _, index := range indexes { - schema.SpacesById[index.SpaceId].IndexesById[index.Id] = index - schema.SpacesById[index.SpaceId].Indexes[index.Name] = index + spaceId := index.SpaceId + if _, ok := schema.SpacesById[spaceId]; ok { + schema.SpacesById[spaceId].IndexesById[index.Id] = index + schema.SpacesById[spaceId].Indexes[index.Name] = index + } else { + return errors.New("concurrent schema update") + } } conn.lockShards() diff --git a/settings/tarantool_test.go b/settings/tarantool_test.go index d32767116..9183fb8e8 100644 --- a/settings/tarantool_test.go +++ b/settings/tarantool_test.go @@ -19,7 +19,7 @@ var isSettingsSupported = false var server = "127.0.0.1:3013" var opts = tarantool.Opts{ - Timeout: 500 * time.Millisecond, + Timeout: 5 * time.Second, User: "test", Pass: "test", } diff --git a/shutdown_test.go b/shutdown_test.go index d9b1db111..105a4dff6 100644 --- a/shutdown_test.go +++ b/shutdown_test.go @@ -418,9 +418,12 @@ func TestGracefulShutdownCloseConcurrent(t *testing.T) { // Do not wait till Tarantool register out watcher, // test everything is ok even on async. - - conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) - defer conn.Close() + conn, err := Connect(shtdnServer, shtdnClntOpts) + if err != nil { + t.Errorf("Failed to connect: %s", err) + } else { + defer conn.Close() + } // Wait till all connections created. srvToStop.Done() diff --git a/tarantool_test.go b/tarantool_test.go index 125642dcf..afea793ed 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -75,7 +75,7 @@ var spaceName = "test" var indexNo = uint32(0) var indexName = "primary" var opts = Opts{ - Timeout: 500 * time.Millisecond, + Timeout: 5 * time.Second, User: "test", Pass: "test", //Concurrency: 32, @@ -3893,6 +3893,29 @@ func TestWatcher_Unregister_concurrent(t *testing.T) { wg.Wait() } +func TestConnect_schema_update(t *testing.T) { + conn := test_helpers.ConnectWithValidation(t, server, opts) + defer conn.Close() + + for i := 0; i < 100; i++ { + fut := conn.Do(NewCallRequest("create_spaces")) + + if conn, err := Connect(server, opts); err != nil { + if err.Error() != "concurrent schema update" { + t.Errorf("unexpected error: %s", err) + } + } else if conn == nil { + t.Errorf("conn is nil") + } else { + conn.Close() + } + + if _, err := fut.Get(); err != nil { + t.Errorf("Failed to call create_spaces: %s", err) + } + } +} + // runTestMain is a body of TestMain function // (see https://pkg.go.dev/testing#hdr-Main). // Using defer + os.Exit is not works so TestMain body diff --git a/uuid/uuid_test.go b/uuid/uuid_test.go index e04ab5ab9..4e63310fa 100644 --- a/uuid/uuid_test.go +++ b/uuid/uuid_test.go @@ -20,7 +20,7 @@ var isUUIDSupported = false var server = "127.0.0.1:3013" var opts = Opts{ - Timeout: 500 * time.Millisecond, + Timeout: 5 * time.Second, User: "test", Pass: "test", }