Bioconductor并行(parallel)运算

当我们进入大数据时代之后,并行运算就十分重要了。什么是大数据呢?引用一句流行语吧:

Big data is hype. Everybody talks about big data; nobody knows exactly what it is.

对于生物学来说,大数据对应的应该是第二代测序技术的兴起。所以我们谈到并行运算,主要是为了处理测序数据。在R中,有许多并行运算的包,HighPerformanceComputing就列举了当前所有的用于并行运算的软件包。为了将它更简单的呈现,我绘制了一张图,以便大家能够对常用的一些并行运算的软件包有一个大约的印象,当人们谈及这些包时,我们会有一个印象。 Bioconductor并行(parallel)运算 从上图中,可以看到,snow几乎关联了所有的包。但是,因为它很基础,所以很多包对它进行了再一次包装,以达到编程人员即使不了解硬件信息,也可以很轻松地移植并行运算代码的目的。而本文的重点,当然是bioconductor中最新出现的用于parallel运算的包:BiocParallel。它的优势在于它支持了很多bioconductor的类,比如GRanges数据。 下面,我们就通过一个实例来了解如何使用BiocParallel,以及在使用的过程中应该注意些什么。

  1. > ##获取一个大一点的数据
  2. > library(GEOquery)
  3. > setwd("Downloads/")
  4. > getGEOSuppFiles("GSM917672")
  5. [1] "ftp://ftp.ncbi.nlm.nih.gov/geo/samples/GSM917nnn/GSM917672/suppl/"
  6. trying URL 'ftp://ftp.ncbi.nlm.nih.gov/geo/samples/GSM917nnn/GSM917672/suppl//GSM917672_286_+_density.wig.gz'
  7. ftp data connection made, file length 17133766 bytes
  8. opened URL
  9. ==================================================
  10. downloaded 16.3 Mb
  11. trying URL 'ftp://ftp.ncbi.nlm.nih.gov/geo/samples/GSM917nnn/GSM917672/suppl//GSM917672_286_-_density.wig.gz'
  12. ftp data connection made, file length 16134038 bytes
  13. opened URL
  14. ==================================================
  15. downloaded 15.4 Mb
  16. > setwd("GSM917672/")
  17. > sapply(dir(), gunzip)
  18. $`GSM917672_286_-_density.wig.gz`
  19. character(0)
  20. $`GSM917672_286_+_density.wig.gz`
  21. character(0)
  22. > file <- dir()[1]
  23. > ##将数据读入内存
  24. > s <- file.info(file)$size
  25. > buf <- readChar(file, s, useBytes=TRUE)
  26. > ##将数据按照WIG文件格式说明分割成小段,每一段只包含一段完整的WIG峰值
  27. > buf <- strsplit(buf, "\n", fixed=TRUE, useBytes=TRUE)[[1]]
  28. > infoLine <- grep("Step", buf)
  29. > block <- c(infoLine, length(buf)+1)
  30. > dif <- diff(block)
  31. > block <- rep(infoLine, dif)
  32. > buf <- split(buf, block)
  33. > ##WIG文件的处理
  34. > library(GenomicRanges)
  35. > parser <- function(ele){
  36. + library(GenomicRanges)##在并行运算中,所有的包都需要再次载入,而在非并行运算中只需要载入一次
  37. + firstline <- ele[1]
  38. + firstline <- unlist(strsplit(firstline, "\\s"))
  39. + firstline <- firstline[firstline!=""]
  40. + structure <- firstline[1]
  41. + firstline <- firstline[-1]
  42. + firstline <- do.call(rbind,strsplit(firstline, "=", fixed=TRUE))
  43. + firstline <- firstline[match(c("chrom","span","start","step"),
  44. + firstline[,1]),]
  45. + wiginfo <- c(structure, firstline[,2])
  46. + names(wiginfo) <- c("structure", "chrom","span","start","step")
  47. + start <- as.numeric(wiginfo["start"])
  48. + end <- start + 1:((length(ele)-1) * as.numeric(wiginfo["step"])) - 1
  49. + start <- c(start, end+1)[1:(length(ele)-1)]
  50. + value <- as.numeric(ele[2:length(ele)])
  51. + GRanges(seqnames=wiginfo["chrom"],
  52. + ranges=IRanges(start, end),
  53. + score=value)
  54. + }
  55. > ##使用pbapply中pblapply来比较运算速度。因为这个过程中也同样需要输出进度条
  56. > library(pbapply)
  57. > system.time(r1 <- pblapply(buf[1:100], parser))
  58. |++++++++++++++++++++++++++++++++++++++++++++++++++| 100%
  59. user system elapsed
  60. 1.474 0.017 1.512
  61. > system.time(r1 <- pblapply(buf[1:10000], parser))
  62. |++++++++++++++++++++++++++++++++++++++++++++++++++| 100%
  63. user system elapsed
  64. 152.830 0.867 157.519
  65. > system.time(r1 <- bplapply(buf[1:50000], parser))
  66. user system elapsed
  67. 1247.566 6.536 1257.071
  68. > ##使用BiocParallel来进行运算
  69. > library("BiocParallel")
  70. > library(BatchJobs)
  71. > cluster.functions <- makeClusterFunctionsMulticore()
  72. Setting for worker localhost: ncpus=8
  73. > bpParams <- BatchJobsParam(cluster.functions=cluster.functions)
  74. > system.time(r2 <- bplapply(buf[1:10000], parser, BPPARAM=bpParams))
  75. SubmitJobs |+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)
  76. Waiting [S:0 R:0 D:10000 E:0] |++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)0:16)
  77. user system elapsed
  78. 0.425 0.325 10.675
  79. > system.time(r2 <- bplapply(buf[1:10000], parser, BPPARAM=bpParams))
  80. SubmitJobs |+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)
  81. Waiting [S:0 R:0 D:10000 E:0] |++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)0:16)
  82. user system elapsed
  83. 7.898 2.351 166.118
  84. > system.time(r2 <- bplapply(buf[1:50000], parser, BPPARAM=bpParams))
  85. SubmitJobs |+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)
  86. Waiting [S:0 R:0 D:50000 E:0] |++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00):02:51))
  87. user system elapsed
  88. 55.197 11.359 732.627
  89. >
  90. > ##比较运算结果是否一致
  91. > identical(r1, r2)
  92. [1] TRUE

从上面的步骤来看,使用BiocParallel进行运算分为三步走, 第一步,书写线程安全的程序,因为每个内核都会重新启动一个R来进行运算,所以程序的书写必须保证每个全新启动的R都可以正确的在其环境中查找到你所需要的所有变量或者函数。 第二步,设置环境参数。这一步使用makeClusterFunctions相关的函数。示例中我们使用了makeClusterFunctionsMulticore函数,其它还有makeClusterFunctionsInteractive, makeClusterFunctionsLocal, makeClusterFunctionsSSH, makeClusterFunctionsTorque, makeClusterFunctionsSGE, makeClusterFunctionsSLURM,具体可参见:?makeClusterFunctions。 第三步,使用bplapply函数提交运算。 从运算的时效性来看,如果每个单独的线程都只是很小的运算量,使用多核并不一定能节约时间,反而有可能因为等待线程结果而花费更多的时间。所以这种时候,不如使用GPU运算。如果每个单独的线程运算量很大时,使用多核就可以极大的节约时间了。 对R中的并行运算的中文博文,还可以参考:http://algosolo.com/competition/429.html 原文来自:http://pgfe.umassmed.edu/ou/archives/3396

发表评论

匿名网友

拖动滑块以完成验证
加载中...