Skip to content

Commit 0ce4443

Browse files
committed
sql/pgwire: implicitly destroy portal on txn finish
Release note (sql change): Previously, committing a transaction when a portal was suspended would cause a "multiple active portals not supported" error. Now, the portal is automatically destroyed.
1 parent 5477fd3 commit 0ce4443

File tree

3 files changed

+577
-12
lines changed

3 files changed

+577
-12
lines changed

pkg/acceptance/testdata/java/src/main/java/MainTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
import java.math.BigDecimal;
88
import java.sql.Array;
99
import java.sql.Connection;
10+
import java.sql.DriverManager;
1011
import java.sql.PreparedStatement;
1112
import java.sql.ResultSet;
1213
import java.sql.ResultSetMetaData;
14+
import java.sql.Statement;
1315
import java.text.SimpleDateFormat;
1416
import java.util.ArrayList;
1517
import java.util.List;
@@ -339,4 +341,45 @@ public void testVirtualTableMetadata() throws Exception {
339341
}
340342
}
341343
}
344+
345+
// Regression test for #42912: using setFetchSize in a transaction should
346+
// not cause issues.
347+
@Test
348+
public void testSetFetchSize() throws Exception {
349+
for (int fetchSize = 0; fetchSize <= 3; fetchSize++) {
350+
testSetFetchSize(fetchSize, true);
351+
testSetFetchSize(fetchSize, false);
352+
}
353+
}
354+
355+
private void testSetFetchSize(int fetchSize, boolean useTransaction) throws Exception {
356+
int expectedResults = fetchSize;
357+
if (fetchSize == 0 || fetchSize == 3) {
358+
expectedResults = 2;
359+
}
360+
361+
try (final Connection testConn = DriverManager.getConnection(getDBUrl(), "root", "")) {
362+
testConn.setAutoCommit(!useTransaction);
363+
try (final Statement stmt = testConn.createStatement()) {
364+
stmt.setFetchSize(fetchSize);
365+
ResultSet result = stmt.executeQuery("select n from generate_series(0,1) n");
366+
for (int i = 0; i < expectedResults; i++) {
367+
Assert.assertTrue(result.next());
368+
Assert.assertEquals(i, result.getInt(1));
369+
}
370+
if (useTransaction) {
371+
// This should implicitly close the ResultSet (i.e. portal).
372+
testConn.commit();
373+
if (fetchSize != 0 && fetchSize != 3) {
374+
try {
375+
result.next();
376+
fail("expected portal to be closed");
377+
} catch (PSQLException e) {
378+
Assert.assertTrue(e.getMessage().contains("unknown portal"));
379+
}
380+
}
381+
}
382+
}
383+
}
384+
}
342385
}

pkg/sql/pgwire/command_result.go

Lines changed: 84 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -442,24 +442,15 @@ func (r *limitedCommandResult) moreResultsNeeded(ctx context.Context) error {
442442
}
443443
switch c := cmd.(type) {
444444
case sql.DeletePreparedStmt:
445-
// The client wants to close a portal or statement. We
446-
// support the case where it is exactly this
447-
// portal. This is done by closing the portal in
448-
// the same way implicit transactions do, but also
449-
// rewinding the stmtBuf to still point to the portal
450-
// close so that the state machine can do its part of
451-
// the cleanup. We are in effect peeking to see if the
445+
// The client wants to close a portal or statement. We support the case
446+
// where it is exactly this portal. We are in effect peeking to see if the
452447
// next message is a delete portal.
453448
if c.Type != pgwirebase.PreparePortal || c.Name != r.portalName {
454449
telemetry.Inc(sqltelemetry.InterleavedPortalRequestCounter)
455450
return errors.WithDetail(sql.ErrLimitedResultNotSupported,
456451
"cannot close a portal while a different one is open")
457452
}
458-
r.typ = noCompletionMsg
459-
// Rewind to before the delete so the AdvanceOne in
460-
// connExecutor.execCmd ends up back on it.
461-
r.conn.stmtBuf.Rewind(ctx, prevPos)
462-
return sql.ErrLimitedResultClosed
453+
return r.rewindAndClosePortal(ctx, prevPos)
463454
case sql.ExecPortal:
464455
// The happy case: the client wants more rows from the portal.
465456
if c.Name != r.portalName {
@@ -483,6 +474,13 @@ func (r *limitedCommandResult) moreResultsNeeded(ctx context.Context) error {
483474
return err
484475
}
485476
default:
477+
// If the portal is immediately followed by a COMMIT, we can proceed and
478+
// let the portal be destroyed at the end of the transaction.
479+
if isCommit, err := r.isCommit(); err != nil {
480+
return err
481+
} else if isCommit {
482+
return r.rewindAndClosePortal(ctx, prevPos)
483+
}
486484
// We got some other message, but we only support executing to completion.
487485
telemetry.Inc(sqltelemetry.InterleavedPortalRequestCounter)
488486
return errors.WithDetail(sql.ErrLimitedResultNotSupported,
@@ -491,3 +489,77 @@ func (r *limitedCommandResult) moreResultsNeeded(ctx context.Context) error {
491489
prevPos = curPos
492490
}
493491
}
492+
493+
// isCommit checks if the statement buffer has a COMMIT at the current
494+
// position. It may either be (1) a COMMIT in the simple protocol, or (2) a
495+
// Parse/Bind/Execute sequence for a COMMIT query.
496+
func (r *limitedCommandResult) isCommit() (bool, error) {
497+
cmd, _, err := r.conn.stmtBuf.CurCmd()
498+
if err != nil {
499+
return false, err
500+
}
501+
// Case 1: Check if cmd is a simple COMMIT statement.
502+
if execStmt, ok := cmd.(sql.ExecStmt); ok {
503+
if _, isCommit := execStmt.AST.(*tree.CommitTransaction); isCommit {
504+
return true, nil
505+
}
506+
}
507+
508+
commitStmtName := ""
509+
commitPortalName := ""
510+
// Case 2a: Check if cmd is a prepared COMMIT statement.
511+
if prepareStmt, ok := cmd.(sql.PrepareStmt); ok {
512+
if _, isCommit := prepareStmt.AST.(*tree.CommitTransaction); isCommit {
513+
commitStmtName = prepareStmt.Name
514+
} else {
515+
return false, nil
516+
}
517+
} else {
518+
return false, nil
519+
}
520+
521+
r.conn.stmtBuf.AdvanceOne()
522+
cmd, _, err = r.conn.stmtBuf.CurCmd()
523+
if err != nil {
524+
return false, err
525+
}
526+
// Case 2b: The next cmd must be a bind command.
527+
if bindStmt, ok := cmd.(sql.BindStmt); ok {
528+
// This bind command must be for the COMMIT statement that we just saw.
529+
if bindStmt.PreparedStatementName == commitStmtName {
530+
commitPortalName = bindStmt.PortalName
531+
} else {
532+
return false, nil
533+
}
534+
} else {
535+
return false, nil
536+
}
537+
538+
r.conn.stmtBuf.AdvanceOne()
539+
cmd, _, err = r.conn.stmtBuf.CurCmd()
540+
if err != nil {
541+
return false, err
542+
}
543+
// Case 2c: The next cmd must be an exec portal command.
544+
if execPortal, ok := cmd.(sql.ExecPortal); ok {
545+
// This exec command must be for the portal that was just bound.
546+
if execPortal.Name == commitPortalName {
547+
return true, nil
548+
}
549+
}
550+
return false, nil
551+
}
552+
553+
// rewindAndClosePortal closes the portal in the same way implicit transactions
554+
// do, but also rewinds the stmtBuf to still point to the portal close so that
555+
// the state machine can do its part of the cleanup.
556+
func (r *limitedCommandResult) rewindAndClosePortal(
557+
ctx context.Context, rewindTo sql.CmdPos,
558+
) error {
559+
// Don't send an CommandComplete for the portal; it got suspended.
560+
r.typ = noCompletionMsg
561+
// Rewind to before the delete so the AdvanceOne in connExecutor.execCmd ends
562+
// up back on it.
563+
r.conn.stmtBuf.Rewind(ctx, rewindTo)
564+
return sql.ErrLimitedResultClosed
565+
}

0 commit comments

Comments
 (0)