未分类 · 2025-08-31 0

MySQL的常规查询、游标查询、流式查询

一、示例

MySQL的常规查询、游标查询、流式查询,分别对应的 rowData 为 ResultsetRowsCursor、ResultsetRowsStatic、ResultsetRowsStreaming

1.pom

<dependencies>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.22</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
        <scope>test</scope>
    </dependency>
</dependencies>

2,java

@Test
public void testStaticQuery() throws Exception {
    String url = "jdbc:mysql://localhost:3306/test1";
    String user = "root";
    String password = "123456";

    Connection conn = DriverManager.getConnection(url, user, password);
    String sql = "SELECT * FROM tbl1";
    PreparedStatement statement = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);

    /**
     * statement 是 ClientPreparedStatement
     * 
     * ResultSet 的 rowData 类型是 ResultsetRowsStatic
     *
     * @see com.mysql.cj.protocol.a.result.NativeResultset#rowData
     */
    ResultSet resultSet = statement.executeQuery(sql);

    while (resultSet.next()) {
        String str = resultSet.getString(1);
        System.out.println(str);
    }

    resultSet.close();
    statement.close();
    conn.close();
}

@Test
public void testCursorQuery() throws Exception {
    String url = "jdbc:mysql://localhost:3306/test1?useCursorFetch=true";
    String user = "root";
    String password = "123456";

    Connection conn = DriverManager.getConnection(url, user, password);
    String sql = "SELECT * FROM tbl1";
    PreparedStatement statement = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
    statement.setFetchSize(20);

    /**
     * statement 是 ServerPreparedStatement
     *
     * ResultSet 的 rowData 类型是 ResultsetRowsCursor
     *
     * @see com.mysql.cj.protocol.a.result.NativeResultset#rowData
     */
    ResultSet resultSet = statement.executeQuery(sql);

    while (resultSet.next()) {
        String str = resultSet.getString(1);
        System.out.println(str);
    }

    resultSet.close();
    statement.close();
    conn.close();
}

@Test
public void testStreamQuery() throws Exception {
    String url = "jdbc:mysql://localhost:3306/test1";
    String user = "root";
    String password = "123456";

    Connection conn = DriverManager.getConnection(url, user, password);
    String sql = "SELECT * FROM tbl1";
    PreparedStatement statement = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
    statement.setFetchSize(Integer.MIN_VALUE);

    /**
     * statement 是 ClientPreparedStatement
     *
     * ResultSet 的 rowData 类型是 ResultsetRowsStreaming
     *
     * @see com.mysql.cj.protocol.a.result.NativeResultset#rowData
     */
    ResultSet resultSet = statement.executeQuery(sql);

    while (resultSet.next()) {
        String str = resultSet.getString(1);
        System.out.println(str);
    }

    resultSet.close();
    statement.close();
    conn.close();
}

二、源码

1.设置 useCursorFetch 和 useServerPrepStmts

在 url 配置 useCursorFetch = true
执行 Connection conn = DriverManager.getConnection(url, user, password),若 useCursorFetch = true,则会设置 useServerPrepStmts = true

// PropertyKey.java
useCursorFetch("useCursorFetch", true),
useServerPrepStmts("useServerPrepStmts", true),

private String keyName;
private String ccAlias = null;
private boolean isCaseSensitive = false;
// PropertyDefinitions.java
public static final Map<PropertyKey, PropertyDefinition<?>> PROPERTY_KEY_TO_PROPERTY_DEFINITION;

static {
    ...
    new BooleanPropertyDefinition(PropertyKey.useCursorFetch, DEFAULT_VALUE_FALSE, RUNTIME_MODIFIABLE,
            Messages.getString("ConnectionProperties.useCursorFetch"), "5.0.0", CATEGORY_PERFORMANCE, Integer.MAX_VALUE),
    new BooleanPropertyDefinition(PropertyKey.useServerPrepStmts, DEFAULT_VALUE_FALSE, RUNTIME_MODIFIABLE,
            Messages.getString("ConnectionProperties.useServerPrepStmts"), "3.1.0", CATEGORY_PREPARED_STATEMENTS, Integer.MIN_VALUE),
    ...
}
// ConnectionImpl.java
public ConnectionImpl(HostInfo hostInfo) throws SQLException {
    try {
        ...
        this.propertySet.initializeProperties(this.props);
        ...
    }
}
// DefaultPropertySet.java
public void initializeProperties(Properties props) {
    if (props != null) {
        Properties infoCopy = (Properties) props.clone();
        ...
        for (PropertyKey propKey : PropertyDefinitions.PROPERTY_KEY_TO_PROPERTY_DEFINITION.keySet()) {
            try {
                RuntimeProperty<?> propToSet = getProperty(propKey);
                propToSet.initializeFrom(infoCopy, null);

            } catch (CJException e) {
                throw ExceptionFactory.createException(WrongArgumentException.class, e.getMessage(), e);
            }
        }
        ...
        postInitialization();
    }
}
// JdbcPropertySetImpl.java
@Override
public void postInitialization() {

    // Adjust max rows
    if (getIntegerProperty(PropertyKey.maxRows).getValue() == 0) {
        // adjust so that it will become MysqlDefs.MAX_ROWS in execSQL()
        super.<Integer>getProperty(PropertyKey.maxRows).setValue(Integer.valueOf(-1), null);
    }
    ...
    if (getBooleanProperty(PropertyKey.useCursorFetch).getValue()) {
        // assume server-side prepared statements are wanted because they're required for this functionality
        super.<Boolean>getProperty(PropertyKey.useServerPrepStmts).setValue(true);
    }
}

2.判断 ClientPreparedStatement 和 ServerPreparedStatement

执行 PreparedStatement statement = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) 时,
根据 useServerPrepStmts 的值,判断是使用 ClientPreparedStatement 和 ServerPreparedStatement

// ConnectionImpl.java
@Override
public java.sql.PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
    synchronized (getConnectionMutex()) {
        checkClosed();

        ClientPreparedStatement pStmt = null;

        boolean canServerPrepare = true;

        String nativeSql = this.processEscapeCodesForPrepStmts.getValue() ? nativeSQL(sql) : sql;

        if (this.useServerPrepStmts.getValue() && this.emulateUnsupportedPstmts.getValue()) {
            canServerPrepare = canHandleAsServerPreparedStatement(nativeSql);
        }

        if (this.useServerPrepStmts.getValue() && canServerPrepare) {
            if (this.cachePrepStmts.getValue()) {
                ...
            } else {
                try {
                    pStmt = ServerPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql, this.database, resultSetType, resultSetConcurrency);

                    pStmt.setResultSetType(resultSetType);
                    pStmt.setResultSetConcurrency(resultSetConcurrency);
                } catch (SQLException sqlEx) {
                    ...
                }
            }
        } else {
            pStmt = (ClientPreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);
        }

        return pStmt;
    }
}

public java.sql.PreparedStatement clientPrepareStatement(String sql, int resultSetType, int resultSetConcurrency, boolean processEscapeCodesIfNeeded)
        throws SQLException {

    String nativeSql = processEscapeCodesIfNeeded && this.processEscapeCodesForPrepStmts.getValue() ? nativeSQL(sql) : sql;

    ClientPreparedStatement pStmt = null;

    if (this.cachePrepStmts.getValue()) {
        ...
    } else {
        pStmt = ClientPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql, this.database);
    }

    pStmt.setResultSetType(resultSetType);
    pStmt.setResultSetConcurrency(resultSetConcurrency);

    return pStmt;
}
// ServerPreparedStatement.java
protected static ServerPreparedStatement getInstance(JdbcConnection conn, String sql, String db, int resultSetType, int resultSetConcurrency)
        throws SQLException {
    return new ServerPreparedStatement(conn, sql, db, resultSetType, resultSetConcurrency);
}

3.判断 ResultsetRowsStatic、ResultsetRowsStreaming 和 ResultsetRowsCursor

执行 statement.executeQuery(sql) 时

// StatementImpl.java
@Override
public java.sql.ResultSet executeQuery(String sql) throws SQLException {
    ...
    if (useServerFetch()) {
        this.results = createResultSetUsingServerFetch(sql);

        return this.results;
    }

    ... 
    this.results = ((NativeSession) locallyScopedConn.getSession()).execSQL(this, sql, this.maxRows, null, createStreamingResultSet(),
            getResultSetFactory(), cachedMetaData, false);
    ...
}

private boolean useServerFetch() throws SQLException {
    synchronized (checkClosed().getConnectionMutex()) {
        return this.session.getPropertySet().getBooleanProperty(PropertyKey.useCursorFetch).getValue() && this.query.getResultFetchSize() > 0
                && this.query.getResultType() == Type.FORWARD_ONLY;
    }
}

protected boolean createStreamingResultSet() {
    return ((this.query.getResultType() == Type.FORWARD_ONLY) && (this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY)
            && (this.query.getResultFetchSize() == Integer.MIN_VALUE));
}
// TextResultsetReader.java
@Override
public Resultset read(int maxRows, boolean streamResults, NativePacketPayload resultPacket, ColumnDefinition metadata,
        ProtocolEntityFactory<Resultset, NativePacketPayload> resultSetFactory) throws IOException {
        ...
        if (columnCount > 0) {
            ...
            ResultsetRows rows = null;

            if (!streamResults) {
                TextRowFactory trf = new TextRowFactory(this.protocol, cdef, resultSetFactory.getResultSetConcurrency(), false);
                ArrayList<ResultsetRow> rowList = new ArrayList<>();

                ResultsetRow row = this.protocol.read(ResultsetRow.class, trf);
                while (row != null) {
                    if ((maxRows == -1) || (rowList.size() < maxRows)) {
                        rowList.add(row);
                    }
                    row = this.protocol.read(ResultsetRow.class, trf);
                }

                rows = new ResultsetRowsStatic(rowList, cdef);

            } else {
                rows = new ResultsetRowsStreaming<>(this.protocol, cdef, false, resultSetFactory);
                this.protocol.setStreamingData(rows);
            }       
          ...
        }
        ...
}
// BinaryResultsetReader.java
@Override
public Resultset read(int maxRows, boolean streamResults, NativePacketPayload resultPacket, ColumnDefinition metadata,
    ProtocolEntityFactory<Resultset, NativePacketPayload> resultSetFactory) throws IOException {
    ...
    if (columnCount > 0) {
        ...
        if (isCursorPosible && this.protocol.getServerSession().cursorExists()) {
            rows = new ResultsetRowsCursor(this.protocol, cdef);

        } else if (!streamResults) {
            BinaryRowFactory brf = new BinaryRowFactory(this.protocol, cdef, resultSetFactory.getResultSetConcurrency(), false);

            ArrayList<ResultsetRow> rowList = new ArrayList<>();
            ResultsetRow row = this.protocol.read(ResultsetRow.class, brf);
            while (row != null) {
                if ((maxRows == -1) || (rowList.size() < maxRows)) {
                    rowList.add(row);
                }
                row = this.protocol.read(ResultsetRow.class, brf);
            }

            rows = new ResultsetRowsStatic(rowList, cdef);

        } else {
            rows = new ResultsetRowsStreaming<>(this.protocol, cdef, true, resultSetFactory);
            this.protocol.setStreamingData(rows);
        }
       ...
    }
    ...
}