2017-09-20 16 views
0
val enableJdbcStreaming: (java.sql.Statement) => Unit = { statement ⇒ 
    if (statement.isWrapperFor(classOf[com.mysql.jdbc.StatementImpl])) { 
     statement.unwrap(classOf[com.mysql.jdbc.StatementImpl]).enableStreamingResults() 
    } 
} 

implicit val actorSystem = ActorSystem() 
implicit val actorMaterializer = ActorMaterializer() 
val config = ConfigFactory.load() 
val db = Database.forConfig("mysql") 
val query = Tables.Foo.map(r => (r.id, r.pid)).result 
val source = Source.fromPublisher[(Long, Option[Long])](db.stream(query.withStatementParameters(statementInit = enableJdbcStreaming))) 
val future = source.runForEach(x => println(x)) 
import actorSystem.distpatcher 
future.onComplete{ _ => 
    db.close() 
    actorSystem.terminate() 
} 
Await.result(future, Duration.Inf) 
Await.result(actorsystem.whenTerminated, Duration.Inf) 

库的依赖油滑3个MySQL流应用只是挂...(不流数据)

"com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.11", 
"com.typesafe.slick" %% "slick" % "3.2.1", 
"com.typesafe.slick" %% "slick-hikaricp" % "3.2.1", 
"mysql" % "mysql-connector-java" % "5.1.44", 
"ch.qos.logback" % "logback-classic" % "1.2.3" 

mysql的配置

mysql { 
    profile = "slick.jdbc.MySQLProfile$" 
    dataSourceClass = "slick.jdbc.DatabaseUrlDataSource" 
    properties { 
    driver = "com.mysql.jdbc.Driver" 
    url = "jdbc:mysql://server:3306/db" 
    user = "user" 
    password = "password" 
    } 
    connectionTimeout = 300 
} 

当我运行这段代码。它只是继续在控制台上打印这些行

[info] 19:20:31.647 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:21:01.649 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:21:31.656 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:22:01.661 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:22:31.668 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:23:01.674 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:23:31.680 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:24:01.687 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:24:31.693 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:25:01.699 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:25:31.706 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:26:01.708 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 

我等了很长时间,但它一直在打印这些行。

我的期望是,它会立即开始逐行阅读我的表。

回答

0

我自己解决了这个问题。我想我没有正确地构建图表。

我使用RunnableGraph重写了我的图。现在它很完美

val query = sql"select id from foo".as[Long] 
val publisher = db.stream(query.withStatementParameters(statementInit = enableJdbcStreaming)) 
val source = Source.fromPublisher[Long](publisher) 
val sink = Sink.foreach[Long]{x => println(x)} 
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit b => s => 
    import GraphDSL.Implicits._ 
    source ~> s.in 
    ClosedShape 
}) 
val future = graph.run() 
Await.result(future, Duration.Inf) 

虽然知道为什么以前的方法不起作用会很有趣。