亲爱的,我是学习 PETSC 的大一新生。我基于 PETSC 库编写了一个非常简单的一维扩散问题代码(只是简单的 FDM 代码)。我想在每个时间步中使用 PETSC 的并行求解器。这是伪代码:
Initial(C0);
for(timestep=0;timestep<MaxTimeStep;timestep++)
{
Assemble(A,C,F); // Assemble system's A*C=F matrix and vector
KSP.solve(A,C,F); // Use PETSC's ksp solver to solve A*C=F
// I just want to make the solve step paralleled
// other part don't need to be paralleled
C0=C; // Update
SaveResult(C0); // Output the result C0
}
这段代码在顺序模式下工作得很好(矩阵和向量都基于 MPI 类型),但是当我尝试使用 mpirun -np 4 ./a.out 时,结果完全错误。我猜 mpirun 使时间步长循环也并行,但我不确定。
所以,我的问题是:
如何在循环中使用 PETSC 的并行求解器,或者换句话说,如何在顺序时间步长中使用并行求解器或 mpirun?
我是否可以使用
mpirun -np 4 ./myapp
并行求解器,但对于时间步进和结果输出,它们仍然是顺序的?在仿真中,时间步长和结果输出必须一一执行,所以我只需要并行求解器,而不是整个程序。
谢谢!
#include "petscksp.h"
void PrintToVTU(PetscInt k,PetscReal dx,Vec u);
int main(int argc,char **args)
{
Vec u,u0;
Mat A;
KSP ksp;
PetscRandom rctx;
PetscReal norm;
PetscInt ne;
PetscReal Length,dx,D;
PetscReal Ca,Cb,alpha;
PetscReal T,dt,ti;
PetscInt i,j,Ii,J,Istart,Iend,m,n,its;
PetscBool flg=PETSC_FALSE;
PetscScalar v;
printf("Input the Length:");
scanf("%f",&Length);
printf("Input the element number:");
scanf("%d",&ne);
printf("Input the Ca,Cb and D:");
scanf("%f %f %f",&Ca,&Cb,&D);
printf("Input dt and T:");
scanf("%f %f",&dt,&T);
n=ne+1;m=n;
dx=Length/ne;
alpha=dt*D/(dx*dx);
#if defined(PETSC_USE_LOG)
PetscLogStage stage;
#endif
static char help[] = "Solves a linear system in parallel with
KSP.\n\
Iput parameters include:\n-random_exact_sol : use a random exact
solution vector\n\
-view_exact_sol : write exact solution vector to stdout\n\
-m <mesh_x> : number of mesh points in x-direction\n\
-n <mesh_n> : number of mesh points in y-direction\n\n";
PetscInitialize(&argc,&args,(char*)0,help);
PetscOptionsGetInt(NULL,NULL,"-m",&m,NULL);
PetscOptionsGetInt(NULL,NULL,"-n",&n,NULL);
printf("m=%d\tn=%d\n",m,n);
// Create Matrix
MatCreate(PETSC_COMM_WORLD,&A);
MatSetSizes(A,PETSC_DECIDE,PETSC_DECIDE,m,n);
MatSetFromOptions(A);
MatMPIAIJSetPreallocation(A,5,NULL,5,NULL);
MatSeqAIJSetPreallocation(A,5,NULL);
MatSeqSBAIJSetPreallocation(A,1,5,NULL);
// Create Vector
VecCreate(PETSC_COMM_WORLD,&u0);
VecSetSizes(u0,PETSC_DECIDE,m);
VecSetFromOptions(u0);
VecDuplicate(u0,&u);
MatGetOwnershipRange(A,&Istart,&Iend);
PetscLogStageRegister("Assembly",&stage);
PetscLogStagePush(stage);
for(Ii=Istart;Ii<Iend;Ii++)
{
if(Ii==0)
{
v=1.0+2.0*alpha;J=Ii;
MatSetValues(A,1,&Ii,1,&J,&v,ADD_VALUES);
v=-2.0*alpha;J=Ii+1;
MatSetValues(A,1,&Ii,1,&J,&v,ADD_VALUES);
}
else if(Ii==n-1)
{
v=-2.0*alpha;J=Ii-1;
MatSetValues(A,1,&Ii,1,&J,&v,ADD_VALUES);
v=1.0+2.0*alpha;J=Ii;
MatSetValues(A,1,&Ii,1,&J,&v,ADD_VALUES);
}
else
{
v=-alpha;J=Ii-1;
MatSetValues(A,1,&Ii,1,&J,&v,ADD_VALUES);
v=1.0+2.0*alpha;J=Ii;
MatSetValues(A,1,&Ii,1,&J,&v,ADD_VALUES);
v=-alpha;J=Ii+1;
MatSetValues(A,1,&Ii,1,&J,&v,ADD_VALUES);
}
if(Ii<=m/2)
{
VecSetValues(u0,1,&Ii,&Ca,ADD_VALUES);
}
else
{
VecSetValues(u0,1,&Ii,&Cb,ADD_VALUES);
}
}
MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);
MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);
VecAssemblyBegin(u0);
VecAssemblyEnd(u0);
KSPCreate(PETSC_COMM_WORLD,&ksp);
KSPSetOperators(ksp,A,A);
KSPSetFromOptions(ksp);
dt=1.0;T=2.0;i=0;
PrintToVTU(i,dx,u0);
printf("Hello\n");
for(ti=0.0;ti<=T;ti+=dt)
{
i+=1;
KSPSolve(ksp,u0,u);
VecCopy(u,u0);
PrintToVTU(i,dx,u0);
}
}