Julia的MPI并行

 

在之前搞计算的时候,虽然也用到了Julia的并行计算,但实现方法上并没有利用MPI,单速度上勉强也是够用的.最近遇到了计算量比较大的情形,此时如果可以在集群上多使用几个节点,多点CPU的话计算速度就可以显著提高.这里就整理一下如何结合MPI实现对Julia的并行。

在之前搞计算的时候,虽然也用到了Julia的并行计算,但实现方法上并没有利用MPI,单速度上勉强也是够用的.最近遇到了计算量比较大的情形,此时如果可以在集群上多使用几个节点,多点CPU的话计算速度就可以显著提高.这里就整理一下如何结合MPI实现对Julia的并行。

并行循环

基本操作

首先要使用Julia的MPI包,但是目前给出的文档在实用性方面比较差,如果本来已经熟悉MPI的话也勉强能看,连蒙带猜的可以用一下这些函数,我就是这么做的。

首先是进行环境初始化

using MPI
MPI.Init()
comm = MPI.COMM_WORLD
root = 0
numcore = MPI.Comm_size(comm)
indcore = MPI.Comm_rank(comm)

这里的MPI.Comm_size(comm)用来获取计算时能够使用的总CPU数量,MPI.Comm_rank(comm)则是对每个CPU一个编号,方便后续的管理以及CPU之间的通信。一般习惯上会将MPI.Comm_rank(comm)=0的核心称为root,因为最后的数据收集以及保存等操作会在这个CPU上进行。得到了这些信息之后,接下来的目的就是将一个循环分成不同的区间,然后每个CPU计算不同的部分。加入我先在想实现一个求和功能

function test(x1,x2)
    te1 = x1:x2
    re::Float64 = 0.0
    for i0 in eachindex(te1)
        re += te1[i0]
    end
    return re
end

将函数放到MPI并行中首先要进行环境的初始化,然后将函数要想实现的功能分配到不同的CPU上面进行计算

numcore = MPI.Comm_size(comm)  # 获取总的CPU数量
indcore = MPI.Comm_rank(comm)  # 每个CPU的索引
numk::Int64 = 1000000
nki = floor(indcore * numk/numcore)
nkf = nki + floor(numk/numcore)
temp = test(nki + 1,nkf)

这里的函数test(x,y)有两个输入,可以将其看做是一个参数区间,而根据这里的划分就可以看到实际上每个CPU所计算的函数test(floor(indcore * numk/numcore) + 1,nki + floor(numk/numcore))都是不同的,而这些结果也都会分别存储在不同的CPU中。

最后就是要将不同CPU中计算的结果汇总到一起,因为这里我用test()函数实现的是一个求和的功能,因此使用

re1 = MPI.Reduce(temp,MPI.SUM,root,comm)  # 在root核心上得到所要的结果

将所有CPU上得到的结果都加起来,然后在root上将最终的结果进行保存

if (MPI.Comm_rank(comm) == root)  # 判断在root核心上做文件保存
        temp1 = (a->(@sprintf "%3.2f" a)).(numk)
        fx1 ="chi-nk-" * temp1 * ".dat"
        f1 = open(fx1,"w")
        x0 = (a->(@sprintf "%10.3f" a)).(re1)
        writedlm(f1,[re1],"\t")
        close(f1)
    end

到此为止就使用MPI实现了简单的并行计算,而且利用MPI实现的并行可以在集群上跨节点,也就是同时并多个节点进行计算。而之前使用Distributed方式的时候,我暂时没有实现如何在多个节点上并行。

废话不多上了直接上代码

using MPI,SharedArrays,LinearAlgebra,Distributed,DelimitedFiles,Printf,Dates
#---------------------------------------
function test(x1,x2)
    te1 = x1:x2
    re::Float64 = 0.0
    for i0 in eachindex(te1)
        re += te1[i0]
    end
    return re
end
#---------------------------------------
function main()
    MPI.Init()
    comm = MPI.COMM_WORLD
    root = 0
    if (MPI.Comm_rank(comm) == root)
        println("开始计算裸的极化率 : ",Dates.now(),"\n")
    end

    numcore = MPI.Comm_size(comm)
    indcore = MPI.Comm_rank(comm)
    numk::Int64 = 1000000
    # println("Hello world, I am rank $(MPI.Comm_rank(comm)) of $(MPI.Comm_size(comm))\n")
    # re1::Float64 = 0.0

    # 将要计算的范围根据每个核心的id进行分配,不同的核心计算不同的部分,尽量避免重复计算,尤其是要对最后结果求和的情形
    nki = floor(indcore * numk/numcore)
    nkf = nki + floor(numk/numcore)


    temp = test(nki + 1,nkf)  # 要计算的量

    MPI.Barrier(comm)
    re1 = MPI.Reduce(temp,MPI.SUM,root,comm)  # 在root核心上得到所要的结果
    if (MPI.Comm_rank(comm) == root)  # 判断在root核心上做文件保存
        temp1 = (a->(@sprintf "%3.2f" a)).(numk)
        fx1 ="chi-nk-" * temp1 * ".dat"
        f1 = open(fx1,"w")
        x0 = (a->(@sprintf "%10.3f" a)).(re1)
        writedlm(f1,[re1],"\t")
        close(f1)
    end
    # MPI.bcast(re1,comm)
    if (MPI.Comm_rank(comm) == root)
        println("结束计算裸的极化率 : ",Dates.now(),"\n")
    end
    MPI.Finalize()
    
end
#---------------------------------------
main()

上面实现了将一个循环根据CPU的编号分成不同的区间,每个CPU执行自己区间的那一部分,最后通过MPI.Reduce函数将所有CPU计算的结果都合并到一起,因为预先分配了数组的元素都是零,所以对于不同CPU计算的结果,只要保证分配的区间没有重合,那么自然就可以保证加和操作得到的结果是没有问题的。因此在写并行程序的时候,最好保证循环数numk是并行CPU数量corenum的整数倍,这样可以很好的避免区间分配重复。

并行数组赋值

加下来考虑如何并行计算并将结果赋值给一个二维数组,上面实现的只是将结果赋值给单独的一个数组,实际上方法还是一样,不过此时就需要写一个二重循环,但是只需要将其中的一层循环像前面一样并行分配到不同的CPU即可,示例代码如下

function main2()
    MPI.Init()
    comm = MPI.COMM_WORLD
    root = 0

    numcore = MPI.Comm_size(comm)
    indcore = MPI.Comm_rank(comm)

    numk::Int64 = 200
    re1 = zeros(Float64,numk,numk)

    nki = floor(indcore * numk/numcore) + 1
    nkf = floor((indcore + 1) * numk/numcore)



    #println("Hello world, I am rank $(MPI.Comm_rank(comm)) of $(MPI.Comm_size(comm))\n")
    for i0 in nki:nkf   # 并行分发
        for i1 in 1:numk
            kx = i0
            ky = i1
            i0 = Int(i0)  # 保证索引是整数
            re1[i0,i1] =  tmat(kx,ky) # 函数返回一个数即可
	    end 
    end

    MPI.Barrier(comm)
    re1 = MPI.Reduce(re1,MPI.SUM,root,comm)  # 在root核心上得到所要的结果

    if (MPI.Comm_rank(comm) == root)  # 判断在root核心上做文件保存
        println("Number of CPU: $(MPI.Comm_size(comm))\n")
        temp1 = (a->(@sprintf "%3.2f" a)).(numk)
        fx1 = "mat-nk-" * temp1 * ".dat"
        f1 = open(fx1,"w")
        x0 = (a->(@sprintf "%10.3f" a)).(re1[:,1])
        y0 = (a->(@sprintf "%10.3f" a)).(re1[:,2])
        writedlm(f1,[x0 y0],"\t")
        close(f1)
    end
    MPI.Finalize()
end

相比于前面这里就只是增加了一重循环,最后同样使用MPI.Reduce(re1,MPI.SUM,root,comm)将结果收集到一起。

任务提交

集群上提交任务的脚本为

#!/bin/bash
#SBATCH --job-name=test
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=20
#SBATCH --cpus-per-task=1
#SBATCH -t 3-00:00:00

# load the environment
module purge
module load compiler/intel/2021.3.0
module load mpi/intelmpi/2021.3.0
module load apps/vasp/6.4.0-i21
module load apps/wannier90/3.1-i21
#your_home/soft/vasp6_d4/bin

# where is your binary file
source your_home/soft/anaconda/anaconda3/etc/profile.d/conda.sh
julia=your_home/soft/julia-1.9.4/bin/julia
python=your_home/soft/anaconda/anaconda3/bin/python3.11

NUM_MPI=$SLURM_NTASKS


echo "======== Job starts at `date +'%Y-%m-%d %T'` on `hostname` ======== "

mpirun -np ${NUM_MPI} julia test.jl

echo "======== Job ends at `date +'%Y-%m-%d %T'` on `hostname` ======== "

公众号

相关内容均会在公众号进行同步,若对该Blog感兴趣,欢迎关注微信公众号。

png