(因为你的问题是关于建立一个流源,这个答案只涉及流媒体支持的发布商方面,而忽略了用户侧)。
支持流媒体要求数据库返回查询结果几排通常基于光标,而不是一次全部。不同的数据库有不同的方式来启用它。 ScalikeJDBC本身支持对MySQL和PostgreSQL驱动程序使用流式iterator
方法。也就是说,与MySQL和PostgreSQL驱动,以下工作:
import scalikejdbc._
import scalikejdbc.streams._
// set up a connection pool
import scala.concurrent.ExecutionContext.Implicits.global
val publisher: DatabasePublisher[Int] = DB.readOnlyStream {
sql"select id from users order by id".map(r => r.get[Int]("id")).iterator
}
上述作品MySQL和PostgreSQL因为this:
/**
* Forcibly changes the database session to be cursor query ready.
*/
val defaultDBSessionForceAdjuster: DBSessionForceAdjuster = (session) => {
// setup required settings to enable cursor operations
session.connectionAttributes.driverName match {
case Some(driver) if driver == "com.mysql.jdbc.Driver" && session.fetchSize.exists(_ > 0) =>
/*
* MySQL - https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
*
* StreamAction.StreamingInvoker prepares the following required settings in advance:
*
* - java.sql.ResultSet.TYPE_FORWARD_ONLY
* - java.sql.ResultSet.CONCUR_READ_ONLY
*
* If the fetchSize is set as 0 or less, we need to forcibly change the value with the Int min value.
*/
session.fetchSize(Int.MinValue)
case Some(driver) if driver == "org.postgresql.Driver" =>
/*
* PostgreSQL - https://jdbc.postgresql.org/documentation/94/query.html
*
* - java.sql.Connection#autocommit false
* - java.sql.ResultSet.TYPE_FORWARD_ONLY
*/
session.conn.setAutoCommit(false)
case _ =>
}
}
注意,最后case
条款意味着ScalikeJDBC做不是默认情况下支持流驱动iterator
与驱动程序以外的MySQL和PostgreSQL。
这并不意味着不能使用其他驱动程序进行流式传输。您引用的文件的部分有下面的代码示例:
val publisher: DatabasePublisher[Int] = DB readOnlyStream {
sql"select id from users".map(r => r.int("id"))
.iterator
.withDBSessionForceAdjuster(session => {
session.conn.setAutoCommit(true)
})
}
什么文档说的是,使流比MySQL和PostgreSQL等数据库,你需要定制DBSession
属性,如在上面的例子,使得光标支持被启用。究竟是什么这种定制所需要的(例如,调整fetchSize
或在连接上禁用autoCommit
)取决于驱动程序(假设驱动程序支持一次检索少量行的查询结果)。