一、示例
MySQL的常规查询、游标查询、流式查询,分别对应的 rowData 为 ResultsetRowsCursor、ResultsetRowsStatic、ResultsetRowsStreaming
1.pom
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
2,java
@Test
public void testStaticQuery() throws Exception {
String url = "jdbc:mysql://localhost:3306/test1";
String user = "root";
String password = "123456";
Connection conn = DriverManager.getConnection(url, user, password);
String sql = "SELECT * FROM tbl1";
PreparedStatement statement = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
/**
* statement 是 ClientPreparedStatement
*
* ResultSet 的 rowData 类型是 ResultsetRowsStatic
*
* @see com.mysql.cj.protocol.a.result.NativeResultset#rowData
*/
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
String str = resultSet.getString(1);
System.out.println(str);
}
resultSet.close();
statement.close();
conn.close();
}
@Test
public void testCursorQuery() throws Exception {
String url = "jdbc:mysql://localhost:3306/test1?useCursorFetch=true";
String user = "root";
String password = "123456";
Connection conn = DriverManager.getConnection(url, user, password);
String sql = "SELECT * FROM tbl1";
PreparedStatement statement = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
statement.setFetchSize(20);
/**
* statement 是 ServerPreparedStatement
*
* ResultSet 的 rowData 类型是 ResultsetRowsCursor
*
* @see com.mysql.cj.protocol.a.result.NativeResultset#rowData
*/
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
String str = resultSet.getString(1);
System.out.println(str);
}
resultSet.close();
statement.close();
conn.close();
}
@Test
public void testStreamQuery() throws Exception {
String url = "jdbc:mysql://localhost:3306/test1";
String user = "root";
String password = "123456";
Connection conn = DriverManager.getConnection(url, user, password);
String sql = "SELECT * FROM tbl1";
PreparedStatement statement = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
statement.setFetchSize(Integer.MIN_VALUE);
/**
* statement 是 ClientPreparedStatement
*
* ResultSet 的 rowData 类型是 ResultsetRowsStreaming
*
* @see com.mysql.cj.protocol.a.result.NativeResultset#rowData
*/
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
String str = resultSet.getString(1);
System.out.println(str);
}
resultSet.close();
statement.close();
conn.close();
}
二、源码
1.设置 useCursorFetch 和 useServerPrepStmts
在 url 配置 useCursorFetch = true
执行 Connection conn = DriverManager.getConnection(url, user, password),若 useCursorFetch = true,则会设置 useServerPrepStmts = true
// PropertyKey.java
useCursorFetch("useCursorFetch", true),
useServerPrepStmts("useServerPrepStmts", true),
private String keyName;
private String ccAlias = null;
private boolean isCaseSensitive = false;
// PropertyDefinitions.java
public static final Map<PropertyKey, PropertyDefinition<?>> PROPERTY_KEY_TO_PROPERTY_DEFINITION;
static {
...
new BooleanPropertyDefinition(PropertyKey.useCursorFetch, DEFAULT_VALUE_FALSE, RUNTIME_MODIFIABLE,
Messages.getString("ConnectionProperties.useCursorFetch"), "5.0.0", CATEGORY_PERFORMANCE, Integer.MAX_VALUE),
new BooleanPropertyDefinition(PropertyKey.useServerPrepStmts, DEFAULT_VALUE_FALSE, RUNTIME_MODIFIABLE,
Messages.getString("ConnectionProperties.useServerPrepStmts"), "3.1.0", CATEGORY_PREPARED_STATEMENTS, Integer.MIN_VALUE),
...
}
// ConnectionImpl.java
public ConnectionImpl(HostInfo hostInfo) throws SQLException {
try {
...
this.propertySet.initializeProperties(this.props);
...
}
}
// DefaultPropertySet.java
public void initializeProperties(Properties props) {
if (props != null) {
Properties infoCopy = (Properties) props.clone();
...
for (PropertyKey propKey : PropertyDefinitions.PROPERTY_KEY_TO_PROPERTY_DEFINITION.keySet()) {
try {
RuntimeProperty<?> propToSet = getProperty(propKey);
propToSet.initializeFrom(infoCopy, null);
} catch (CJException e) {
throw ExceptionFactory.createException(WrongArgumentException.class, e.getMessage(), e);
}
}
...
postInitialization();
}
}
// JdbcPropertySetImpl.java
@Override
public void postInitialization() {
// Adjust max rows
if (getIntegerProperty(PropertyKey.maxRows).getValue() == 0) {
// adjust so that it will become MysqlDefs.MAX_ROWS in execSQL()
super.<Integer>getProperty(PropertyKey.maxRows).setValue(Integer.valueOf(-1), null);
}
...
if (getBooleanProperty(PropertyKey.useCursorFetch).getValue()) {
// assume server-side prepared statements are wanted because they're required for this functionality
super.<Boolean>getProperty(PropertyKey.useServerPrepStmts).setValue(true);
}
}
2.判断 ClientPreparedStatement 和 ServerPreparedStatement
执行 PreparedStatement statement = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) 时,
根据 useServerPrepStmts 的值,判断是使用 ClientPreparedStatement 和 ServerPreparedStatement
// ConnectionImpl.java
@Override
public java.sql.PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
synchronized (getConnectionMutex()) {
checkClosed();
ClientPreparedStatement pStmt = null;
boolean canServerPrepare = true;
String nativeSql = this.processEscapeCodesForPrepStmts.getValue() ? nativeSQL(sql) : sql;
if (this.useServerPrepStmts.getValue() && this.emulateUnsupportedPstmts.getValue()) {
canServerPrepare = canHandleAsServerPreparedStatement(nativeSql);
}
if (this.useServerPrepStmts.getValue() && canServerPrepare) {
if (this.cachePrepStmts.getValue()) {
...
} else {
try {
pStmt = ServerPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql, this.database, resultSetType, resultSetConcurrency);
pStmt.setResultSetType(resultSetType);
pStmt.setResultSetConcurrency(resultSetConcurrency);
} catch (SQLException sqlEx) {
...
}
}
} else {
pStmt = (ClientPreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);
}
return pStmt;
}
}
public java.sql.PreparedStatement clientPrepareStatement(String sql, int resultSetType, int resultSetConcurrency, boolean processEscapeCodesIfNeeded)
throws SQLException {
String nativeSql = processEscapeCodesIfNeeded && this.processEscapeCodesForPrepStmts.getValue() ? nativeSQL(sql) : sql;
ClientPreparedStatement pStmt = null;
if (this.cachePrepStmts.getValue()) {
...
} else {
pStmt = ClientPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql, this.database);
}
pStmt.setResultSetType(resultSetType);
pStmt.setResultSetConcurrency(resultSetConcurrency);
return pStmt;
}
// ServerPreparedStatement.java
protected static ServerPreparedStatement getInstance(JdbcConnection conn, String sql, String db, int resultSetType, int resultSetConcurrency)
throws SQLException {
return new ServerPreparedStatement(conn, sql, db, resultSetType, resultSetConcurrency);
}
3.判断 ResultsetRowsStatic、ResultsetRowsStreaming 和 ResultsetRowsCursor
执行 statement.executeQuery(sql) 时
// StatementImpl.java
@Override
public java.sql.ResultSet executeQuery(String sql) throws SQLException {
...
if (useServerFetch()) {
this.results = createResultSetUsingServerFetch(sql);
return this.results;
}
...
this.results = ((NativeSession) locallyScopedConn.getSession()).execSQL(this, sql, this.maxRows, null, createStreamingResultSet(),
getResultSetFactory(), cachedMetaData, false);
...
}
private boolean useServerFetch() throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
return this.session.getPropertySet().getBooleanProperty(PropertyKey.useCursorFetch).getValue() && this.query.getResultFetchSize() > 0
&& this.query.getResultType() == Type.FORWARD_ONLY;
}
}
protected boolean createStreamingResultSet() {
return ((this.query.getResultType() == Type.FORWARD_ONLY) && (this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY)
&& (this.query.getResultFetchSize() == Integer.MIN_VALUE));
}
// TextResultsetReader.java
@Override
public Resultset read(int maxRows, boolean streamResults, NativePacketPayload resultPacket, ColumnDefinition metadata,
ProtocolEntityFactory<Resultset, NativePacketPayload> resultSetFactory) throws IOException {
...
if (columnCount > 0) {
...
ResultsetRows rows = null;
if (!streamResults) {
TextRowFactory trf = new TextRowFactory(this.protocol, cdef, resultSetFactory.getResultSetConcurrency(), false);
ArrayList<ResultsetRow> rowList = new ArrayList<>();
ResultsetRow row = this.protocol.read(ResultsetRow.class, trf);
while (row != null) {
if ((maxRows == -1) || (rowList.size() < maxRows)) {
rowList.add(row);
}
row = this.protocol.read(ResultsetRow.class, trf);
}
rows = new ResultsetRowsStatic(rowList, cdef);
} else {
rows = new ResultsetRowsStreaming<>(this.protocol, cdef, false, resultSetFactory);
this.protocol.setStreamingData(rows);
}
...
}
...
}
// BinaryResultsetReader.java
@Override
public Resultset read(int maxRows, boolean streamResults, NativePacketPayload resultPacket, ColumnDefinition metadata,
ProtocolEntityFactory<Resultset, NativePacketPayload> resultSetFactory) throws IOException {
...
if (columnCount > 0) {
...
if (isCursorPosible && this.protocol.getServerSession().cursorExists()) {
rows = new ResultsetRowsCursor(this.protocol, cdef);
} else if (!streamResults) {
BinaryRowFactory brf = new BinaryRowFactory(this.protocol, cdef, resultSetFactory.getResultSetConcurrency(), false);
ArrayList<ResultsetRow> rowList = new ArrayList<>();
ResultsetRow row = this.protocol.read(ResultsetRow.class, brf);
while (row != null) {
if ((maxRows == -1) || (rowList.size() < maxRows)) {
rowList.add(row);
}
row = this.protocol.read(ResultsetRow.class, brf);
}
rows = new ResultsetRowsStatic(rowList, cdef);
} else {
rows = new ResultsetRowsStreaming<>(this.protocol, cdef, true, resultSetFactory);
this.protocol.setStreamingData(rows);
}
...
}
...
}