Skip to content

Question: Merger 抽象相关逻辑语义问题及批量查询实现问题 #165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
longyue0521 opened this issue Mar 3, 2023 · 10 comments
Closed
Labels
question Further information is requested
Milestone

Comments

@longyue0521
Copy link
Collaborator

抽象定义问题

当前Merger和Rows抽象定义如下

type Merger interface {
	Merge(ctx context.Context, results []*sql.Rows) (Rows, error)
}

type Rows interface {
	Next() bool
	Scan(dest ...any) error
	Close() error
	Columns() ([]string, error)
}
  1. Rows中方法是sql.Rows的方法的子集,方法签名相同但逻辑语义却不同(见下方MergerRows实现问题),在后续使用Rows会让人困惑。是否可以让Rows中与sql.Rows相同的方法保持一致的逻辑语义,既符合直觉使用者不用改变习惯也能是代码根据扩展性。换句话说,重新定义Rows语义达到让sql.Rows实现了Rows接口的效果。
  2. 然后将Merge方法中的*sql.Rows替换为Rows —— Merge(ctx context.Context, group []Rows) (Rows, error) 使其更具通用性、组合性、扩展性。

批量查询实现问题

  1. Merger实现
type Merger struct{}

func (Merger) Merge(ctx context.Context, results []*sql.Rows) (merger.Rows, error) {
	if ctx.Err() != nil {
		return nil, ctx.Err()
	}
	if len(results) == 0 {
		return nil, errs.ErrMergerEmptyRows
	}
	for i := 0; i < len(results); i++ {
                // 1. 为了使超时控制尽可能准确,每次循环前都要check一下ctx.Err()
                // 2. 对于sql.Rows类型的results[i]还要检查results[i].NextResultSet(), 有的sql.Rows中会包含多个结果集
                // Next()只在当前结果集中遍历,这里要考虑是否允许results[i]有多个结果集的情况,不允许就报错
		if results[i] == nil {
			return nil, errs.ErrMergerRowsIsNull
		}
	}
	return &MergerRows{
		rows: results,
		mu:   &sync.RWMutex{},
	}, nil
}
  1. MergerRows实现
type MergerRows struct {
	rows []*sql.Rows
	cnt  int
	mu   *sync.RWMutex
	once sync.Once
}

func (m *MergerRows) Next() bool {
	m.mu.RLock()
	if m.cnt >= len(m.rows) {
		m.mu.RUnlock()
		return false
	}
        // 与下方Scan并发会丢失行
	if m.rows[m.cnt].Next() {
		m.mu.RUnlock()
		return true
	}
	m.mu.RUnlock()
	m.mu.Lock()
	defer m.mu.Unlock()
	if m.cnt >= len(m.rows) {
		return false
	}
	if m.rows[m.cnt].Next() {
		return true
	}
        // 如果支持多结果集,这里在更新m.cnt前需要调用NextResultSet()
	for {
		m.cnt++
		if m.cnt >= len(m.rows) {
			break
		}
		if m.rows[m.cnt].Next() {
			return true
		}
                // 如果支持多结果集,这里在更新m.cnt前需要调用NextResultSet()
	}
	return false

}

func (m *MergerRows) Scan(dest ...any) error {
	m.mu.RLock()
	defer m.mu.RUnlock()
        // 与上方Next中的m.rows[m.cnt].Next()会有并发问题
	return m.rows[m.cnt].Scan(dest...)
}

func (m *MergerRows) Close() error {
	var err error
	m.once.Do(func() {
		for i := 0; i < len(m.rows); i++ {
			row := m.rows[i]
			err = row.Close()
                        // 是否该保存一个err,然后向后继续遍历关闭下一个rows
			if err != nil {
				return
			}
		}
	})
	return err
}

func (m *MergerRows) Columns() ([]string, error) {
	return m.rows[0].Columns()
}
  • Scan()与Next()方法并发问题导致的丢失行问题,g1在Scan()方法的m.rows[m.cnt].Scan(dest...)处,g2执行到Next()的第一个if m.rows[m.cnt].Next() {...}把本该由g1读取走的缓存行用新行给覆盖掉了。如果MergerRows与sql.Rows语义一致即不能用于并发场景,那么读写锁m.mu的作用是什么?
  • 如果允许有多个结果集的情况,那么Next()方法中当m.rows[m.cnt].Next()返回false时,需要调用其上的NextResultSet()检查/获取下一个结果集
  • Close()方法,遇到第一个m.rows[i].Close()报错就返回似乎有些不妥,是否应该暂时记录下err,继续向后遍历关闭后续m.rows[i+1...N]。另外sql.Rows的Close是幂等的,所以多次调用也是安全的。
  • 缺少Err() 方法,merger.Rows的使用方法应该与sql.Rows一致,即在用for rows.Next() { rows.Scan(...) }之后用rows.Err()来检查迭代期间遇到的错误。

你的问题

你使用的是 eorm 哪个版本?

你设置的的 Go 环境?

上传 go env 的结果

@longyue0521 longyue0521 added the question Further information is requested label Mar 3, 2023
@longyue0521 longyue0521 changed the title 分库分表:Merger 抽象相关逻辑语义问题及批量查询实现问题 Question: Merger 抽象相关逻辑语义问题及批量查询实现问题 Mar 3, 2023
@juniaoshaonian
Copy link
Collaborator

关于抽象定义问题:明哥说暂时先定义这些用到的方法
批量查询实现问题:关于加锁的问题,好像确实不用加,我等会改改。Close方法如果说将每一个错误都记录下来的话。最后我将所有的error拼起来你觉得这样还行不

@longyue0521
Copy link
Collaborator Author

关于抽象定义问题:明哥说暂时先定义这些用到的方法 批量查询实现问题:关于加锁的问题,好像确实不用加,我等会改改。Close方法如果说将每一个错误都记录下来的话。最后我将所有的error拼起来你觉得这样还行不

Err()根据sql.Rows的设计思路,在用for row.Next() { rows.Scan(...) }之后应该用if err := rows.Err(); err != nil 来检查遍历rows过程中的错误. 所以需要在merger.Rows中添加,当然这基于一个前提假设,将merger.Rows设计为*sql.Rows的抽象.

暂时先别改 等一下 @flycash 的回复吧

@flycash
Copy link
Contributor

flycash commented Mar 4, 2023

Scan()与Next()方法并发问题导致的丢失行问题
这个我看了一下,确实有这个问题,可以考虑整个 next 都改为写锁。我想到 next 本身语义就是游标前移,那么用写锁应该是对的;

Close 方法
实际上这里面可以设计成遇到一个 error 就返回,也可以是考虑引入 multierr 来处理,这个倒无所谓

NextResultSet 问题
这倒是我的知识盲点了,什么情况下会分成多批数据?类似于 cursor 的那种用法?还是说类似于延迟返回的用法?目前其实我有点怀疑我们要不要支持到这个地步。如果支持 NextResultSet,那么大概就是一个双重循环,倒也不是很复杂。不过我又想到,对于大批量数据来说,NextResultSet 应该很好用,就是说分批返回结果集。@longyue0521 可以针对这个 NextResultSet 做一个简单的调研吗?就是确认它用来干啥,有什么特点

Rows 要保证和 sql.Rows 的语义
这个我是赞同的——虽然我觉得将来可能很难保持住,但是现在站在设计的角度,还是要保证的。包括超时检测,确实应该要每个循环都检测一下。因为我们可以预期,在复杂的分库分表查询里面,结果集处理会非常耗时,那么就应该主动检测一下。

Err 方法加上去吧
@juniaoshaonian 可以检查一下 sql.Rows 有什么公开方法,然后我们都加上一个对应的版本,我觉得这样比较好。

我还有没有遗漏任何问题?

@longyue0521
Copy link
Collaborator Author

@flycash

  1. Merger的merge方法签名修改的问题,是否可以将[]*sql.Rows替换为[]Rows,新方法签名:merge(ctx context.Context, group []Rows) (Rows, error),但这就要求Rows接口的所有实现都要实现最小语义范围(保持与sql.Rows的语义相同),实现者可扩大但不能缩小语义范围。比如:sql.Rows不能用在并发场景,其Next和Scan会共享一个切片。咱们的这个MergerRows实现在并发和非并发场景下都能用,这是可以的。
  2. sql.Rows的Close 方法描述如下

Close closes the Rows, preventing further enumeration. If Next is called and returns false and there are no further result
sets, the Rows are closed automatically and it will suffice to check the result of Err. Close is idempotent and does not affect
the result of Err.

Close的主要作用是防止进一步枚举即用户再次调用Next/Scan获取数据,咱们的mergerRows也要达到这个效果,所以我认为一定要遍历全部的[]sql.Rows并调用其Close()方法,做的全面点就将此遍历过程中的err全部记录再返回,简单点就记录第一个再返回。但不能碰到一个sql.Rows的Close方法返回error就中断循环不再遍历了。没有调用Close的sql.Rows仍能被使用,这不符合上面的Close语义。
3. 好的,我会调研一下NextResultSet的场景及特点

@flycash
Copy link
Contributor

flycash commented Mar 4, 2023

  1. 我觉得这个不是很适合。因为用户不知道怎么构建这个 Rows 对象。从用途上来说,Rows 对象是我们返回给用户的,一个已经合并好的结果集。最开始的时候我设想的是直接返回 sql.Rows,是因为我不希望用户过多知道我内部用了啥。而且,从分库分表的角度来说,在执行期间,用户拿到的就是 sql.Rows
  2. 那就可以尽力 Close,我们使用 multierr。这么说起来,我们要保持语义,还需要在 Next() 的时候如果发现没有数据了,还得顺手 Close 了

@juniaoshaonian
Copy link
Collaborator

juniaoshaonian commented Mar 4, 2023

sqlrows的方法除了已经定义的还有

  1. ColumnTypes() 返回的是sql.rows包自己定义的对象,是一些查看列的信息的方法,
  • Name(),
  • DatabaseTypeName(),
  • ScanType()ScanType返回一个适合使用Rows.Scan扫描的Go类型。
  • DecimalSize() 返回是否是浮点数返回精度
  • Length
  • Nullable()
  1. NextResultSet()
  2. Err()

@longyue0521
Copy link
Collaborator Author

longyue0521 commented Mar 5, 2023

  1. 我觉得这个不是很适合。因为用户不知道怎么构建这个 Rows 对象。从用途上来说,Rows 对象是我们返回给用户的,一个已经合并好的结果集。最开始的时候我设想的是直接返回 sql.Rows,是因为我不希望用户过多知道我内部用了啥。而且,从分库分表的角度来说,在执行期间,用户拿到的就是 sql.Rows
  • 你说的有道理。 我是基于Rows接口类型的语义与sql.Rows保持一致的假设进一步想——未来会不会有多个合并好的结果集[]merger.Rows再次合并的需求,才会建议修改merge API的。
  1. 那就可以尽力 Close,我们使用 multierr。这么说起来,我们要保持语义,还需要在 Next() 的时候如果发现没有数据了,还得顺手 Close 了
  • multierr 指的是这个库吧 github.com/uber-go/multierr
  • 其实也不用因为results[i].Next()返回false时就自动关了,咱们的Rows.Next()发现没数据等价于results[i].Next()都没数据也就自动关了。

@longyue0521
Copy link
Collaborator Author

关于NextResultSet的调研结果如下:

Go的database/sql只支持一条查询语句返回一个结果集,不支持一条查询语句返回多个结果集;返回多个结果集的情况——SELECT XXXX; SELECT YYYY; 这样的查询语句会返回多个结果集。

详情见下方学习资料1

可能出现的问题

  1. 不同结果集列不同——误用/恶意拼接不同查询

用户使用SELECT yyyyy; SELECT xxxxx ; 来得到slq.Rows,一般来说两个结果集的列是不相同的。这种情况不好检测,所以在Merger.merge方法上要注明sql.Rows的前置条件即用户要承担的职责。

  1. 不同结果集列相同——拼接分页查询

用户可能使用SELECT xxx limit 0, 10; SELECT xxx limit 10, 10; .... (拼接),这种情况一个sql.Rows中会有多个结果集好消息是结果集的列是相同的,此时我们就需要处理NextResultSet()。

  1. 存储过程
  1. mySQL、postgreSQL开发包对NextResultSet的语义支持可能不一致

综上,也许我们应该强制要求一个SELECT只对应一个sql.Rows且不支持存储过程,这样就不用处理NextResultSet了。

学习资料

  1. 根据issue database/sql: add support for returning multiple result sets golang/go#12382 Go官方添加了相关功能,整个功能的添加及review过程 https://go-review.googlesource.com/c/go/+/30592/ 有助于深入理解NextResultSet的目的及作用

  2. 用go-sqlmock测试NextResultSet的案例 https://github.com/DATA-DOG/go-sqlmock/blob/master/rows_go18_test.go#L14

  3. lib/pq postgreSQL案例 https://github.com/lib/pq/blob/master/go18_test.go#L14

  4. go-sql-driver/mysql 使用案例 https://github.com/go-sql-driver/mysql/blob/master/driver_test.go#L2223

  5. Go官方Example

func ExampleDB_Query_multipleResultSets() {
	age := 27
	q := `
create temp table uid (id bigint); -- Create temp table for queries.
insert into uid
select id from users where age < ?; -- Populate temp table.

-- First result set.
select
	users.id, name
from
	users
	join uid on users.id = uid.id
;

-- Second result set.
select 
	ur.user, ur.role
from
	user_roles as ur
	join uid on uid.id = ur.user
;
	`
	rows, err := db.Query(q, age)
	if err != nil {
		log.Fatal(err)
	}
	defer rows.Close()

	for rows.Next() {
		var (
			id   int64
			name string
		)
		if err := rows.Scan(&id, &name); err != nil {
			log.Fatal(err)
		}
		log.Printf("id %d name is %s\n", id, name)
	}
	if !rows.NextResultSet() {
		log.Fatalf("expected more result sets: %v", rows.Err())
	}
	var roleMap = map[int64]string{
		1: "user",
		2: "admin",
		3: "gopher",
	}
	for rows.Next() {
		var (
			id   int64
			role int64
		)
		if err := rows.Scan(&id, &role); err != nil {
			log.Fatal(err)
		}
		log.Printf("id %d has role %s\n", id, roleMap[role])
	}
	if err := rows.Err(); err != nil {
		log.Fatal(err)
	}
}
  1. Go官方tutorial Handling multiple result sets

When your database operation might return multiple result sets, you can retrieve those by using Rows.NextResultSet. This can be useful, for example, when you’re sending SQL that separately queries multiple tables, returning a result set for each.

Rows.NextResultSet prepares the next result set so that a call to Rows.Next retrieves the first row from that next set. It returns a boolean indicating whether there is a next result set at all.

Code in the following example uses DB.Query to execute two SQL statements. The first result set is from the first query in the procedure, retrieving all of the rows in the album table. The next result set is from the second query, retrieving rows from the song table.

rows, err := db.Query("SELECT * from album; SELECT * from song;")
if err != nil {
    log.Fatal(err)
}
defer rows.Close()

// Loop through the first result set.
for rows.Next() {
    // Handle result set.
}

// Advance to next result set.
rows.NextResultSet()

// Loop through the second result set.
for rows.Next() {
    // Handle second set.
}

// Check for any error in either result set.
if err := rows.Err(); err != nil {
    log.Fatal(err)
}

@flycash
Copy link
Contributor

flycash commented Mar 5, 2023

感谢@longyue0521 的调研。

我觉得我们的目标是保持和 sql.Rows 语义一样,但是并不意味着我们现在就要支持全部的 sql.Rows 的所有的方法。我准备采取一种延迟支持的策略,即只有我们需要的时候再支持。而如果我们支持了,就要保持和 sql.Rows 的语义一致。

所以@juniaoshaonian 你可以先按照所需方法的最小集来支持,然后每一个方法都保持和 sql.Rows 的语义一致,所以请在具体的 rows 实现里面支持 NextResultSet。

从理论上来说,我们的分库分表功能永远不会走进去 NextResultSet,但是如果考虑将 merger 将来作为一个单独的项目拆出去的话,那么还是要考虑万一就有人不按照套路出牌呢。

@longyue0521
Copy link
Collaborator Author

感谢@longyue0521 的调研。

我觉得我们的目标是保持和 sql.Rows 语义一样,但是并不意味着我们现在就要支持全部的 sql.Rows 的所有的方法。我准备采取一种延迟支持的策略,即只有我们需要的时候再支持。而如果我们支持了,就要保持和 sql.Rows 的语义一致。

不客气 ^_^ ,@juniaoshaonian 在Rows的定义上方把这个策略及语义要求描述一下以便后续维护者理解、维护。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
Status: Done
Development

No branches or pull requests

3 participants