0

我在AWS上的雅典娜数据库上有一个很大的数据集。我想从它并行读取,我习惯了foreach包的方法来从分叉内R.从雅典娜(AWS)数据库并行读取,通过R

我使用RJDBC

这里就是我想:

out <- foreach(i = 1:length(fipsvec), .combine = rbind, .errorhandling = "remove") %dopar% { 
    coni <- dbConnect(driver, "jdbc:awsathena://<<location>>/", 
      s3_staging_dir="my_directory", 
      user="...", 
      password="...") 
    print(paste0("starting ", i)) 
    sqlstring <- paste0("SELECT ", 
      "My_query_body" 
      fipsvec[i] 
    ) 
    row <- fetch(dbSendQuery(coni, sqlstring), -1, block = 999)   
    print(i) 
    dbDisconnect(coni) 
    rm(coni) 
    gc() 
    return(row) 
} 

(对不起,我不能让这个重复性 - 我显然不能用手在网上的钥匙DB)

当我运行此,第一c = number of cores步骤来运行良好,但之后它挂起和什么都不做 - indefi据我所知,尽可能少用。 htop在任何内核上均未显示任何活动。当我将for循环更改为仅循环使用c条目时,输出结果就是我所期望的。当我从并行切换到串行(%do%而不是%dopar%)时,它也可以正常工作。

这是否与连接未被正确关闭或以某种方式被冗余定义有关?我已将连接放置在并行循环中,因此每个核心都应该在自己的环境中拥有自己的连接。但是我对数据库知之甚少,无法分辨这是否足够明显。

我会很感激的答案,帮助我理解这里引擎盖下发生了什么 - 这一切对我来说都是伏都教。

回答

0

您是否将RJDBC包(以及它的依赖关系 - methods,DBIrJava)传递到群集中的任何位置?

如果没有,你对你的代码的第一行应该类似于下面:

results <- foreach(i = 1:length(fipsvec), 
        .combine = rbind, 
        .errorhandling = "remove", 
        .packages=c('methods','DBI','rJava','RJDBC')) %dopar% { 

有一件事,我怀疑(但不知道)可能使事情有点多毛是RJDBC使用JVM执行查询。对于rJava如何处理JVM初始化,以及每个线程是否试图同时重新使用同一个JVM,或者如果他们有足够的外部环境信息来正确初始化一个JVM,都不太了解。

如果上述操作不起作用,另一个故障排除步骤可能是将driver的分配步骤移动到%dopar%环境中。

在另一个轨道上,结果集中有多少行?如果结果集在百万行的范围内,并且可以通过单个查询返回,我实际上在RJDBC包中遇到了优化的机会,并且在github上有一个公开的请求(https://github.com/s-u/RJDBC/pull/50),我没有听到任何信息但已经使用了我几个月。在拉取请求中记录了一个基本的基准,我发现加速对于我正在运行的特定查询非常重要。

如果它似乎适用,您可以用安装分支:

library(devtools)  
devtools::install_github("msummersgill/RJDBC",ref = "harmonize", force = TRUE) 
+0

它不是'.packages'说法。根据我的经验,这只对Windows机器很重要。无论如何,当我在4核实例上循环4个条目时,并行脚本工作正常。所以每个线程都可以执行一次 - 只有一次 - 迭代。现在尝试移动驱动程序... –

+0

另一个尝试的步骤可能是将'.jinit(force.init = TRUE)'行作为%dopar%环境中的第一行。 (但是我真的只是在这个时候吐口水,也不知道自己对java端的了解) –

+0

移动驱动程序调用也没有帮助。现在尝试使用'harmonize'分支 - 我试图并行执行此操作的原因是,当尝试将大块读入内存时出现错误。我只有大约1e5行,但我有3000列,我需要其中大部分。 –