在之前搞计算的时候,虽然也用到了Julia的并行计算,但实现方法上并没有利用MPI,单速度上勉强也是够用的.最近遇到了计算量比较大的情形,此时如果可以在集群上多使用几个节点,多点CPU的话计算速度就可以显著提高.这里就整理一下如何结合MPI实现对Julia的并行。
并行循环
基本操作
首先要使用Julia的MPI包,但是目前给出的文档在实用性方面比较差,如果本来已经熟悉MPI的话也勉强能看,连蒙带猜的可以用一下这些函数,我就是这么做的。
首先是进行环境初始化
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
root = 0
numcore = MPI.Comm_size(comm)
indcore = MPI.Comm_rank(comm)
这里的MPI.Comm_size(comm)
用来获取计算时能够使用的总CPU数量,MPI.Comm_rank(comm)
则是对每个CPU一个编号,方便后续的管理以及CPU之间的通信。一般习惯上会将MPI.Comm_rank(comm)=0
的核心称为root
,因为最后的数据收集以及保存等操作会在这个CPU上进行。得到了这些信息之后,接下来的目的就是将一个循环分成不同的区间,然后每个CPU计算不同的部分。加入我先在想实现一个求和功能
function test(x1,x2)
te1 = x1:x2
re::Float64 = 0.0
for i0 in eachindex(te1)
re += te1[i0]
end
return re
end
将函数放到MPI并行中首先要进行环境的初始化,然后将函数要想实现的功能分配到不同的CPU上面进行计算
numcore = MPI.Comm_size(comm) # 获取总的CPU数量
indcore = MPI.Comm_rank(comm) # 每个CPU的索引
numk::Int64 = 1000000
nki = floor(indcore * numk/numcore)
nkf = nki + floor(numk/numcore)
temp = test(nki + 1,nkf)
这里的函数test(x,y)
有两个输入,可以将其看做是一个参数区间,而根据这里的划分就可以看到实际上每个CPU所计算的函数test(floor(indcore * numk/numcore) + 1,nki + floor(numk/numcore))
都是不同的,而这些结果也都会分别存储在不同的CPU中。
最后就是要将不同CPU中计算的结果汇总到一起,因为这里我用test()
函数实现的是一个求和的功能,因此使用
re1 = MPI.Reduce(temp,MPI.SUM,root,comm) # 在root核心上得到所要的结果
将所有CPU上得到的结果都加起来,然后在root
上将最终的结果进行保存
if (MPI.Comm_rank(comm) == root) # 判断在root核心上做文件保存
temp1 = (a->(@sprintf "%3.2f" a)).(numk)
fx1 ="chi-nk-" * temp1 * ".dat"
f1 = open(fx1,"w")
x0 = (a->(@sprintf "%10.3f" a)).(re1)
writedlm(f1,[re1],"\t")
close(f1)
end
到此为止就使用MPI实现了简单的并行计算,而且利用MPI实现的并行可以在集群上跨节点,也就是同时并多个节点进行计算。而之前使用Distributed方式的时候,我暂时没有实现如何在多个节点上并行。
废话不多上了直接上代码
using MPI,SharedArrays,LinearAlgebra,Distributed,DelimitedFiles,Printf,Dates
#---------------------------------------
function test(x1,x2)
te1 = x1:x2
re::Float64 = 0.0
for i0 in eachindex(te1)
re += te1[i0]
end
return re
end
#---------------------------------------
function main()
MPI.Init()
comm = MPI.COMM_WORLD
root = 0
if (MPI.Comm_rank(comm) == root)
println("开始计算裸的极化率 : ",Dates.now(),"\n")
end
numcore = MPI.Comm_size(comm)
indcore = MPI.Comm_rank(comm)
numk::Int64 = 1000000
# println("Hello world, I am rank $(MPI.Comm_rank(comm)) of $(MPI.Comm_size(comm))\n")
# re1::Float64 = 0.0
# 将要计算的范围根据每个核心的id进行分配,不同的核心计算不同的部分,尽量避免重复计算,尤其是要对最后结果求和的情形
nki = floor(indcore * numk/numcore)
nkf = nki + floor(numk/numcore)
temp = test(nki + 1,nkf) # 要计算的量
MPI.Barrier(comm)
re1 = MPI.Reduce(temp,MPI.SUM,root,comm) # 在root核心上得到所要的结果
if (MPI.Comm_rank(comm) == root) # 判断在root核心上做文件保存
temp1 = (a->(@sprintf "%3.2f" a)).(numk)
fx1 ="chi-nk-" * temp1 * ".dat"
f1 = open(fx1,"w")
x0 = (a->(@sprintf "%10.3f" a)).(re1)
writedlm(f1,[re1],"\t")
close(f1)
end
# MPI.bcast(re1,comm)
if (MPI.Comm_rank(comm) == root)
println("结束计算裸的极化率 : ",Dates.now(),"\n")
end
MPI.Finalize()
end
#---------------------------------------
main()
上面实现了将一个循环根据CPU的编号分成不同的区间,每个CPU执行自己区间的那一部分,最后通过MPI.Reduce
函数将所有CPU计算的结果都合并到一起,因为预先分配了数组的元素都是零,所以对于不同CPU计算的结果,只要保证分配的区间没有重合,那么自然就可以保证加和操作得到的结果是没有问题的。因此在写并行程序的时候,最好保证循环数numk
是并行CPU数量corenum
的整数倍,这样可以很好的避免区间分配重复。
并行数组赋值
加下来考虑如何并行计算并将结果赋值给一个二维数组,上面实现的只是将结果赋值给单独的一个数组,实际上方法还是一样,不过此时就需要写一个二重循环,但是只需要将其中的一层循环像前面一样并行分配到不同的CPU即可,示例代码如下
function main2()
MPI.Init()
comm = MPI.COMM_WORLD
root = 0
numcore = MPI.Comm_size(comm)
indcore = MPI.Comm_rank(comm)
numk::Int64 = 200
re1 = zeros(Float64,numk,numk)
nki = floor(indcore * numk/numcore) + 1
nkf = floor((indcore + 1) * numk/numcore)
#println("Hello world, I am rank $(MPI.Comm_rank(comm)) of $(MPI.Comm_size(comm))\n")
for i0 in nki:nkf # 并行分发
for i1 in 1:numk
kx = i0
ky = i1
i0 = Int(i0) # 保证索引是整数
re1[i0,i1] = tmat(kx,ky) # 函数返回一个数即可
end
end
MPI.Barrier(comm)
re1 = MPI.Reduce(re1,MPI.SUM,root,comm) # 在root核心上得到所要的结果
if (MPI.Comm_rank(comm) == root) # 判断在root核心上做文件保存
println("Number of CPU: $(MPI.Comm_size(comm))\n")
temp1 = (a->(@sprintf "%3.2f" a)).(numk)
fx1 = "mat-nk-" * temp1 * ".dat"
f1 = open(fx1,"w")
x0 = (a->(@sprintf "%10.3f" a)).(re1[:,1])
y0 = (a->(@sprintf "%10.3f" a)).(re1[:,2])
writedlm(f1,[x0 y0],"\t")
close(f1)
end
MPI.Finalize()
end
相比于前面这里就只是增加了一重循环,最后同样使用MPI.Reduce(re1,MPI.SUM,root,comm)
将结果收集到一起。
任务提交
集群上提交任务的脚本为
#!/bin/bash
#SBATCH --job-name=test
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=20
#SBATCH --cpus-per-task=1
#SBATCH -t 3-00:00:00
# load the environment
module purge
module load compiler/intel/2021.3.0
module load mpi/intelmpi/2021.3.0
module load apps/vasp/6.4.0-i21
module load apps/wannier90/3.1-i21
#your_home/soft/vasp6_d4/bin
# where is your binary file
source your_home/soft/anaconda/anaconda3/etc/profile.d/conda.sh
julia=your_home/soft/julia-1.9.4/bin/julia
python=your_home/soft/anaconda/anaconda3/bin/python3.11
NUM_MPI=$SLURM_NTASKS
echo "======== Job starts at `date +'%Y-%m-%d %T'` on `hostname` ======== "
mpirun -np ${NUM_MPI} julia test.jl
echo "======== Job ends at `date +'%Y-%m-%d %T'` on `hostname` ======== "
公众号
相关内容均会在公众号进行同步,若对该Blog感兴趣,欢迎关注微信公众号。