当我们进入大数据时代之后,并行运算就十分重要了。什么是大数据呢?引用一句流行语吧:
Big data is hype. Everybody talks about big data; nobody knows exactly what it is.
对于生物学来说,大数据对应的应该是第二代测序技术的兴起。所以我们谈到并行运算,主要是为了处理测序数据。在R中,有许多并行运算的包,HighPerformanceComputing就列举了当前所有的用于并行运算的软件包。为了将它更简单的呈现,我绘制了一张图,以便大家能够对常用的一些并行运算的软件包有一个大约的印象,当人们谈及这些包时,我们会有一个印象。 从上图中,可以看到,snow几乎关联了所有的包。但是,因为它很基础,所以很多包对它进行了再一次包装,以达到编程人员即使不了解硬件信息,也可以很轻松地移植并行运算代码的目的。而本文的重点,当然是bioconductor中最新出现的用于parallel运算的包:BiocParallel。它的优势在于它支持了很多bioconductor的类,比如GRanges数据。 下面,我们就通过一个实例来了解如何使用BiocParallel,以及在使用的过程中应该注意些什么。
- > ##获取一个大一点的数据
- > library(GEOquery)
- > setwd("Downloads/")
- > getGEOSuppFiles("GSM917672")
- [1] "ftp://ftp.ncbi.nlm.nih.gov/geo/samples/GSM917nnn/GSM917672/suppl/"
- trying URL 'ftp://ftp.ncbi.nlm.nih.gov/geo/samples/GSM917nnn/GSM917672/suppl//GSM917672_286_+_density.wig.gz'
- ftp data connection made, file length 17133766 bytes
- opened URL
- ==================================================
- downloaded 16.3 Mb
- trying URL 'ftp://ftp.ncbi.nlm.nih.gov/geo/samples/GSM917nnn/GSM917672/suppl//GSM917672_286_-_density.wig.gz'
- ftp data connection made, file length 16134038 bytes
- opened URL
- ==================================================
- downloaded 15.4 Mb
- > setwd("GSM917672/")
- > sapply(dir(), gunzip)
- $`GSM917672_286_-_density.wig.gz`
- character(0)
- $`GSM917672_286_+_density.wig.gz`
- character(0)
- > file <- dir()[1]
- > ##将数据读入内存
- > s <- file.info(file)$size
- > buf <- readChar(file, s, useBytes=TRUE)
- > ##将数据按照WIG文件格式说明分割成小段,每一段只包含一段完整的WIG峰值
- > buf <- strsplit(buf, "\n", fixed=TRUE, useBytes=TRUE)[[1]]
- > infoLine <- grep("Step", buf)
- > block <- c(infoLine, length(buf)+1)
- > dif <- diff(block)
- > block <- rep(infoLine, dif)
- > buf <- split(buf, block)
- > ##WIG文件的处理
- > library(GenomicRanges)
- > parser <- function(ele){
- + library(GenomicRanges)##在并行运算中,所有的包都需要再次载入,而在非并行运算中只需要载入一次
- + firstline <- ele[1]
- + firstline <- unlist(strsplit(firstline, "\\s"))
- + firstline <- firstline[firstline!=""]
- + structure <- firstline[1]
- + firstline <- firstline[-1]
- + firstline <- do.call(rbind,strsplit(firstline, "=", fixed=TRUE))
- + firstline <- firstline[match(c("chrom","span","start","step"),
- + firstline[,1]),]
- + wiginfo <- c(structure, firstline[,2])
- + names(wiginfo) <- c("structure", "chrom","span","start","step")
- + start <- as.numeric(wiginfo["start"])
- + end <- start + 1:((length(ele)-1) * as.numeric(wiginfo["step"])) - 1
- + start <- c(start, end+1)[1:(length(ele)-1)]
- + value <- as.numeric(ele[2:length(ele)])
- + GRanges(seqnames=wiginfo["chrom"],
- + ranges=IRanges(start, end),
- + score=value)
- + }
- > ##使用pbapply中pblapply来比较运算速度。因为这个过程中也同样需要输出进度条
- > library(pbapply)
- > system.time(r1 <- pblapply(buf[1:100], parser))
- |++++++++++++++++++++++++++++++++++++++++++++++++++| 100%
- user system elapsed
- 1.474 0.017 1.512
- > system.time(r1 <- pblapply(buf[1:10000], parser))
- |++++++++++++++++++++++++++++++++++++++++++++++++++| 100%
- user system elapsed
- 152.830 0.867 157.519
- > system.time(r1 <- bplapply(buf[1:50000], parser))
- user system elapsed
- 1247.566 6.536 1257.071
- > ##使用BiocParallel来进行运算
- > library("BiocParallel")
- > library(BatchJobs)
- > cluster.functions <- makeClusterFunctionsMulticore()
- Setting for worker localhost: ncpus=8
- > bpParams <- BatchJobsParam(cluster.functions=cluster.functions)
- > system.time(r2 <- bplapply(buf[1:10000], parser, BPPARAM=bpParams))
- SubmitJobs |+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)
- Waiting [S:0 R:0 D:10000 E:0] |++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)0:16)
- user system elapsed
- 0.425 0.325 10.675
- > system.time(r2 <- bplapply(buf[1:10000], parser, BPPARAM=bpParams))
- SubmitJobs |+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)
- Waiting [S:0 R:0 D:10000 E:0] |++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)0:16)
- user system elapsed
- 7.898 2.351 166.118
- > system.time(r2 <- bplapply(buf[1:50000], parser, BPPARAM=bpParams))
- SubmitJobs |+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)
- Waiting [S:0 R:0 D:50000 E:0] |++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00):02:51))
- user system elapsed
- 55.197 11.359 732.627
- >
- > ##比较运算结果是否一致
- > identical(r1, r2)
- [1] TRUE
从上面的步骤来看,使用BiocParallel进行运算分为三步走, 第一步,书写线程安全的程序,因为每个内核都会重新启动一个R来进行运算,所以程序的书写必须保证每个全新启动的R都可以正确的在其环境中查找到你所需要的所有变量或者函数。 第二步,设置环境参数。这一步使用makeClusterFunctions相关的函数。示例中我们使用了makeClusterFunctionsMulticore函数,其它还有makeClusterFunctionsInteractive, makeClusterFunctionsLocal, makeClusterFunctionsSSH, makeClusterFunctionsTorque, makeClusterFunctionsSGE, makeClusterFunctionsSLURM,具体可参见:?makeClusterFunctions。 第三步,使用bplapply函数提交运算。 从运算的时效性来看,如果每个单独的线程都只是很小的运算量,使用多核并不一定能节约时间,反而有可能因为等待线程结果而花费更多的时间。所以这种时候,不如使用GPU运算。如果每个单独的线程运算量很大时,使用多核就可以极大的节约时间了。 对R中的并行运算的中文博文,还可以参考:http://algosolo.com/competition/429.html 原文来自:http://pgfe.umassmed.edu/ou/archives/3396