Actual source code: aodatabasic.c

  1: /*$Id: aodatabasic.c,v 1.64 2001/08/07 03:04:35 balay Exp $*/

  3: /*
  4:   The most basic AOData routines. These store the entire database on each processor.
  5:   These routines are very simple; note that we do not even use a private data structure
  6:   for AOData, and the private datastructure for AODataSegment is just used as a simple array.

  8:   These are made slightly complicated by having to be able to handle logical variables
  9:   stored in bit arrays. Thus,
 10:     - Before mallocing to hold a bit array, we shrunk the array length by a factor
 11:       of 8 using PetscBTLength()
 12:     - We use PetscBitMemcpy() to allow us to copy at the individual bit level;
 13:       for regular datatypes this just does a regular memcpy().
 14: */

 16:  #include src/dm/ao/aoimpl.h
 17:  #include petscsys.h
 18:  #include petscbt.h

 20: #undef __FUNCT__  
 22: int AODataDestroy_Basic(AOData ao)
 23: {
 24:   int           ierr;
 25:   AODataKey     *key = ao->keys,*nextkey;
 26:   AODataSegment *seg,*nextseg;

 29:   /* if memory was published with AMS then destroy it */
 30:   PetscObjectDepublish(ao);

 32:   while (key) {
 33:     PetscFree(key->name);
 34:     if (key->ltog) {
 35:       ISLocalToGlobalMappingDestroy(key->ltog);
 36:     }
 37:     seg = key->segments;
 38:     while (seg) {
 39:       ierr    = PetscFree(seg->data);
 40:       ierr    = PetscFree(seg->name);
 41:       nextseg = seg->next;
 42:       ierr    = PetscFree(seg);
 43:       seg     = nextseg;
 44:     }
 45:     PetscFree(key->rowners);
 46:     nextkey = key->next;
 47:     PetscFree(key);
 48:     key     = nextkey;
 49:   }
 50: 
 51:   PetscLogObjectDestroy(ao);
 52:   PetscHeaderDestroy(ao);
 53:   return(0);
 54: }

 56: #undef __FUNCT__  
 58: int AODataView_Basic_Binary(AOData ao,PetscViewer viewer)
 59: {
 60:   int             ierr,N,fd;
 61:   AODataSegment   *segment;
 62:   AODataKey       *key = ao->keys;
 63:   char            paddedname[256];


 67:   ierr  = PetscViewerBinaryGetDescriptor(viewer,&fd);

 69:   /* write out number of keys */
 70:   PetscBinaryWrite(fd,&ao->nkeys,1,PETSC_INT,0);

 72:   while (key) {
 73:     N   = key->N;
 74:     /* 
 75:        Write out name of key - use a fixed length for the name in the binary 
 76:        file to make seeking easier
 77:     */
 78:     PetscMemzero(paddedname,256*sizeof(char));
 79:     PetscStrncpy(paddedname,key->name,255);
 80:     PetscBinaryWrite(fd,paddedname,256,PETSC_CHAR,0);
 81:     /* write out the number of indices */
 82:     PetscBinaryWrite(fd,&key->N,1,PETSC_INT,0);
 83:     /* write out number of segments */
 84:     PetscBinaryWrite(fd,&key->nsegments,1,PETSC_INT,0);
 85: 
 86:     /* loop over segments writing them out */
 87:     segment = key->segments;
 88:     while (segment) {
 89:       /* 
 90:          Write out name of segment - use a fixed length for the name in the binary 
 91:          file to make seeking easier
 92:       */
 93:       PetscMemzero(paddedname,256*sizeof(char));
 94:       PetscStrncpy(paddedname,segment->name,255);
 95:       PetscBinaryWrite(fd,paddedname,256,PETSC_CHAR,0);
 96:       PetscBinaryWrite(fd,&segment->bs,1,PETSC_INT,0);
 97:       PetscBinaryWrite(fd,&segment->datatype,1,PETSC_INT,0);
 98:       /* write out the data */
 99:       PetscBinaryWrite(fd,segment->data,N*segment->bs,segment->datatype,0);
100:       segment = segment->next;
101:     }
102:     key = key->next;
103:   }

105:   return(0);
106: }

108: /*
109:       All processors have the same data so processor 0 prints it
110: */
111: #undef __FUNCT__  
113: int AODataView_Basic_ASCII(AOData ao,PetscViewer viewer)
114: {
115:   int               ierr,j,k,l,rank,size,nkeys,nsegs,i,N,bs,zero = 0;
116:   char              *dt,**keynames,**segnames,*stype,*segvalue;
117:   AODataSegment     *segment;
118:   AODataKey         *key = ao->keys;
119:   PetscDataType     dtype;
120:   PetscViewerFormat format;

123:   MPI_Comm_rank(ao->comm,&rank);
124:   if (rank) return(0);
125:   MPI_Comm_size(ao->comm,&size);

127:   PetscViewerGetFormat(viewer,&format);
128:   if (format == PETSC_VIEWER_ASCII_INFO) {
129:     AODataGetInfo(ao,&nkeys,&keynames);
130:     for (i=0; i<nkeys; i++) {
131:       AODataKeyGetInfo(ao,keynames[i],&N,0,&nsegs,&segnames);
132:       PetscViewerASCIIPrintf(viewer,"  %s: (%d)n",keynames[i],N);
133:       for (j=0; j<nsegs; j++) {
134:         AODataSegmentGetInfo(ao,keynames[i],segnames[j],&bs,&dtype);
135:         PetscDataTypeGetName(dtype,&stype);
136:         if (dtype == PETSC_CHAR) {
137:           AODataSegmentGet(ao,keynames[i],segnames[j],1,&zero,(void **)&segvalue);
138:           PetscViewerASCIIPrintf(viewer,"      %s: (%d) %s -> %sn",segnames[j],bs,stype,segvalue);
139:           AODataSegmentRestore(ao,keynames[i],segnames[j],1,&zero,(void **)&segvalue);
140:         } else {
141:           PetscViewerASCIIPrintf(viewer,"      %s: (%d) %sn",segnames[j],bs,stype);
142:         }
143:       }
144:     }
145:     PetscFree(keynames);
146:   } else {
147:     PetscViewerASCIIUseTabs(viewer,PETSC_FALSE);
148:     while (key) {
149:       PetscViewerASCIIPrintf(viewer,"AOData Key: %s Length %d Ownership: ",key->name,key->N);
150:       for (j=0; j<size+1; j++) {PetscViewerASCIIPrintf(viewer,"%d ",key->rowners[j]);}
151:       PetscViewerASCIIPrintf(viewer,"n");

153:       segment = key->segments;
154:       while (segment) {
155:         PetscDataTypeGetName(segment->datatype,&dt);
156:         PetscViewerASCIIPrintf(viewer,"  AOData Segment: %s Blocksize %d datatype %sn",segment->name,segment->bs,dt);
157:         if (segment->datatype == PETSC_INT) {
158:           int *mdata = (int*)segment->data;
159:           for (k=0; k<key->N; k++) {
160:             PetscViewerASCIIPrintf(viewer," %d: ",k);
161:             for (l=0; l<segment->bs; l++) {
162:               PetscViewerASCIIPrintf(viewer,"   %d ",mdata[k*segment->bs + l]);
163:             }
164:             PetscViewerASCIIPrintf(viewer,"n");
165:           }
166:         } else if (segment->datatype == PETSC_DOUBLE) {
167:           PetscReal *mdata = (PetscReal*)segment->data;
168:           for (k=0; k<key->N; k++) {
169:             PetscViewerASCIIPrintf(viewer," %d: ",k);
170:             for (l=0; l<segment->bs; l++) {
171:               PetscViewerASCIIPrintf(viewer,"   %18.16e ",mdata[k*segment->bs + l]);
172:             }
173:             PetscViewerASCIIPrintf(viewer,"n");
174:           }
175:         } else if (segment->datatype == PETSC_SCALAR) {
176:           PetscScalar *mdata = (PetscScalar*)segment->data;
177:           for (k=0; k<key->N; k++) {
178:             PetscViewerASCIIPrintf(viewer," %d: ",k);
179:             for (l=0; l<segment->bs; l++) {
180: #if !defined(PETSC_USE_COMPLEX)
181:               PetscViewerASCIIPrintf(viewer,"   %18.16e ",mdata[k*segment->bs + l]);
182: #else
183:               PetscScalar x = mdata[k*segment->bs + l];
184:               if (PetscImaginaryPart(x) > 0.0) {
185:                 PetscViewerASCIIPrintf(viewer," %18.16e + %18.16e i n",PetscRealPart(x),PetscImaginaryPart(x));
186:               } else if (PetscImaginaryPart(x) < 0.0) {
187:                 PetscViewerASCIIPrintf(viewer,"   %18.16e - %18.16e i n",PetscRealPart(x),-PetscImaginaryPart(x));
188:               } else {
189:                 PetscViewerASCIIPrintf(viewer,"   %18.16e n",PetscRealPart(x));
190:               }
191: #endif
192:             }
193:           }
194:           PetscViewerASCIIPrintf(viewer,"n");
195:         } else if (segment->datatype == PETSC_LOGICAL) {
196:           PetscBT mdata = (PetscBT) segment->data;
197:           for (k=0; k<key->N; k++) {
198:             PetscViewerASCIIPrintf(viewer," %d: ",k);
199:             for (l=0; l<segment->bs; l++) {
200:               PetscViewerASCIIPrintf(viewer,"   %d ",(int)PetscBTLookup(mdata,k*segment->bs + l));
201:             }
202:             PetscViewerASCIIPrintf(viewer,"n");
203:           }
204:         } else if (segment->datatype == PETSC_CHAR) {
205:           char * mdata = (char*)segment->data;
206:           for (k=0; k<key->N; k++) {
207:             PetscViewerASCIIPrintf(viewer,"  %s ",mdata + k*segment->bs);
208:           }
209:           PetscViewerASCIIPrintf(viewer,"n");
210:         } else {
211:           SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Unknown PETSc data format");
212:         }
213:         segment = segment->next;
214:       }
215:       key = key->next;
216:     }
217:   }
218:   PetscViewerASCIIUseTabs(viewer,PETSC_TRUE);
219:   return(0);
220: }

222: #undef __FUNCT__  
224: int AODataView_Basic(AOData ao,PetscViewer viewer)
225: {
226:   int        rank,ierr;
227:   PetscTruth isascii,isbinary;

230:   MPI_Comm_rank(ao->comm,&rank);
231:   if (rank) return(0);

233:   PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_ASCII,&isascii);
234:   PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_BINARY,&isbinary);
235:   if (isascii) {
236:     AODataView_Basic_ASCII(ao,viewer);
237:   } else if (isbinary) {
238:     AODataView_Basic_Binary(ao,viewer);
239:   } else {
240:     SETERRQ1(1,"Viewer type %s not supported for AOData basic",((PetscObject)viewer)->type_name);
241:   }

243:   return(0);
244: }

246: #undef __FUNCT__  
248: int AODataKeyRemove_Basic(AOData aodata,char *name)
249: {
250:   AODataSegment    *segment,*iseg;
251:   AODataKey        *key,*ikey;
252:   int              ierr;
253:   PetscTruth       flag;

256:   AODataKeyFind_Private(aodata,name,&flag,&key);
257:   if (flag != 1)  return(0);
258:   aodata->nkeys--;

260:   segment = key->segments;
261:   while (segment) {
262:     iseg    = segment->next;
263:     PetscFree(segment->name);
264:     PetscFree(segment->data);
265:     PetscFree(segment);
266:     segment = iseg;
267:   }
268:   ikey = aodata->keys;
269:   if (key == ikey) {
270:     aodata->keys = key->next;
271:     goto finishup1;
272:   }
273:   while (ikey->next != key) {
274:     ikey = ikey->next;
275:   }
276:   ikey->next = key->next;

278:   finishup1:

280:   PetscFree(key->name);
281:   PetscFree(key->rowners);
282:   PetscFree(key);

284:   return(0);
285: }

287: #undef __FUNCT__  
289: int AODataSegmentRemove_Basic(AOData aodata,char *name,char *segname)
290: {
291:   AODataSegment    *segment,*iseg;
292:   AODataKey        *key;
293:   int              ierr;
294:   PetscTruth       flag;

297:   AODataSegmentFind_Private(aodata,name,segname,&flag,&key,&iseg);
298:   if (!flag)  return(0);
299:   key->nsegments--;

301:   segment = key->segments;
302:   if (segment == iseg) {
303:     key->segments = segment->next;
304:     goto finishup2;
305:   }
306:   while (segment->next != iseg) {
307:     segment = segment->next;
308:   }
309:   segment->next = iseg->next;
310:   segment       = iseg;
311: 
312:   finishup2:

314:   PetscFree(segment->name);
315:   PetscFree(segment->data);
316:   PetscFree(segment);
317:   return(0);
318: }


321: #undef __FUNCT__  
323: int AODataSegmentAdd_Basic(AOData aodata,char *name,char *segname,int bs,int n,int *keys,void *data,PetscDataType dtype)
324: {
325:   AODataSegment    *segment,*iseg;
326:   AODataKey        *key;
327:   int              N,size,ierr,*lens,i,*disp,*akeys,datasize,*fkeys,Nb,j;
328:   MPI_Datatype     mtype;
329:   char             *adata;
330:   MPI_Comm         comm = aodata->comm;
331:   PetscTruth       flag;

334:   ierr  = AODataKeyFind_Private(aodata,name,&flag,&key);
335:   if (!flag)  SETERRQ1(PETSC_ERR_ARG_OUTOFRANGE,"Key %s doesn't exist",name);
336:   AODataSegmentFind_Private(aodata,name,segname,&flag,&key,&iseg);
337:   if (flag)  SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,"Segment %s in key %s already exists",name,segname);

339:   PetscNew(AODataSegment,&segment);
340:   if (iseg) {
341:     iseg->next    = segment;
342:   } else {
343:     key->segments = segment;
344:   }
345:   segment->next     = 0;
346:   segment->bs       = bs;
347:   segment->datatype = dtype;

349:   PetscDataTypeGetSize(dtype,&datasize);

351:   /*
352:      If keys not given, assume each processor provides entire data 
353:   */
354:   if (!keys && n == key->N) {
355:     char *fdata1;
356:     if (dtype == PETSC_LOGICAL) Nb = PetscBTLength(key->N); else Nb = key->N;
357:     PetscMalloc((Nb*bs+1)*datasize,&fdata1);
358:     PetscBitMemcpy(fdata1,0,data,0,key->N*bs,dtype);
359:     segment->data = (void*)fdata1;
360:   } else if (!keys) {
361:     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Keys not given, but not all data given on each processor");
362:   } else {
363:     /* transmit all lengths to all processors */
364:     MPI_Comm_size(comm,&size);
365:     PetscMalloc(2*size*sizeof(int),&lens);
366:     disp = lens + size;
367:     MPI_Allgather(&n,1,MPI_INT,lens,1,MPI_INT,comm);
368:     N =  0;
369:     for (i=0; i<size; i++) {
370:       disp[i]  = N;
371:       N       += lens[i];
372:     }
373:     if (N != key->N) {
374:       SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Did not provide correct number of keys for keyname");
375:     }

377:     /*
378:       Allocate space for all keys and all data 
379:     */
380:     PetscMalloc((N+1)*sizeof(int),&akeys);
381:     PetscMalloc((N*bs+1)*datasize,&adata);

383:     MPI_Allgatherv(keys,n,MPI_INT,akeys,lens,disp,MPI_INT,comm);
384:     for (i=0; i<size; i++) {
385:       disp[i] *= bs;
386:       lens[i] *= bs;
387:     }
388: 
389:     if (dtype != PETSC_LOGICAL) {
390:       char *fdata2;

392:       PetscDataTypeToMPIDataType(dtype,&mtype);
393:       MPI_Allgatherv(data,n*bs,mtype,adata,lens,disp,mtype,comm);
394:       PetscFree(lens);

396:       /*
397:         Now we have all the keys and data we need to put it in order
398:       */
399:       PetscMalloc((key->N+1)*sizeof(int),&fkeys);
400:       PetscMemzero(fkeys,(key->N+1)*sizeof(int));
401:       PetscMalloc((key->N*bs+1)*datasize,&fdata2);

403:       for (i=0; i<N; i++) {
404:         if (fkeys[akeys[i]] != 0) {
405:           SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Duplicate key");
406:         }
407:         if (fkeys[akeys[i]] >= N) {
408:           SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Key out of range");
409:         }
410:         fkeys[akeys[i]] = 1;
411:         PetscBitMemcpy(fdata2,akeys[i]*bs,adata,i*bs,bs,dtype);
412:       }
413:       for (i=0; i<N; i++) {
414:         if (!fkeys[i]) {
415:           SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Missing key");
416:         }
417:       }
418:       segment->data = (void*)fdata2;
419:     } else {
420:       /*
421:             For logical input the length is given by the user in bits; we need to 
422:             convert to bytes to send with MPI
423:       */
424:       PetscBT fdata3,mvalues = (PetscBT) data;
425:       char *values;
426:       PetscMalloc((n+1)*bs*sizeof(char),&values);
427:       for (i=0; i<n; i++) {
428:         for (j=0; j<bs; j++) {
429:           if (PetscBTLookup(mvalues,i*bs+j)) values[i*bs+j] = 1; else values[i*bs+j] = 0;
430:         }
431:       }

433:       MPI_Allgatherv(values,n*bs,MPI_BYTE,adata,lens,disp,MPI_BYTE,comm);
434:       PetscFree(lens);
435:       PetscFree(values);

437:       /*
438:         Now we have all the keys and data we need to put it in order
439:       */
440:       PetscMalloc((key->N+1)*sizeof(int),&fkeys);
441:       PetscMemzero(fkeys,(key->N+1)*sizeof(int));
442:       PetscBTCreate(N*bs,fdata3);

444:       for (i=0; i<N; i++) {
445:         if (fkeys[akeys[i]] != 0) {
446:           SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Duplicate key");
447:         }
448:         if (fkeys[akeys[i]] >= N) {
449:           SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Key out of range");
450:         }
451:         fkeys[akeys[i]] = 1;
452:         for (j=0; j<bs; j++) {
453:           if (adata[i*bs+j]) { PetscBTSet(fdata3,i*bs+j); }
454:         }
455:       }
456:       for (i=0; i<N; i++) {
457:         if (!fkeys[i]) {
458:           SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Missing key");
459:         }
460:       }
461:       segment->data = (void*)fdata3;
462:     }
463:     PetscFree(akeys);
464:     PetscFree(adata);
465:     PetscFree(fkeys);
466:   }

468:   key->nsegments++;

470:   PetscStrallocpy(segname,&segment->name);
471:   return(0);
472: }

474: #undef __FUNCT__  
476: int AODataSegmentGetExtrema_Basic(AOData ao,char *name,char *segname,void *xmax,void *xmin)
477: {
478:   AODataSegment    *segment;
479:   AODataKey        *key;
480:   int              ierr,i,bs,n,j;
481:   PetscTruth       flag;

484:   /* find the correct segment */
485:   AODataSegmentFind_Private(ao,name,segname,&flag,&key,&segment);
486:   if (!flag) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Cannot locate segment");

488:   n       = key->N;
489:   bs      = segment->bs;

491:   if (segment->datatype == PETSC_INT) {
492:     int *vmax = (int*)xmax,*vmin = (int*)xmin,*values = (int*)segment->data;
493:     for (j=0; j<bs; j++) {
494:       vmax[j] = vmin[j] = values[j];
495:     }
496:     for (i=1; i<n; i++) {
497:       for (j=0; j<bs; j++) {
498:         vmax[j] = PetscMax(vmax[j],values[bs*i+j]);
499:         vmin[j] = PetscMin(vmin[j],values[bs*i+j]);
500:       }
501:     }
502:   } else if (segment->datatype == PETSC_DOUBLE) {
503:     PetscReal *vmax = (PetscReal*)xmax,*vmin = (PetscReal*)xmin,*values = (PetscReal*)segment->data;
504:     for (j=0; j<bs; j++) {
505:       vmax[j] = vmin[j] = values[j];
506:     }
507:     for (i=1; i<n; i++) {
508:       for (j=0; j<bs; j++) {
509:         vmax[j] = PetscMax(vmax[j],values[bs*i+j]);
510:         vmin[j] = PetscMin(vmin[j],values[bs*i+j]);
511:       }
512:     }
513:   } else SETERRQ(PETSC_ERR_SUP,"Cannot find extrema for this data type");

515:   return(0);
516: }

518: #undef __FUNCT__  
520: int AODataSegmentGet_Basic(AOData ao,char *name,char *segname,int n,int *keys,void **data)
521: {
522:   AODataSegment    *segment;
523:   AODataKey        *key;
524:   int              ierr,dsize,i,bs,nb;
525:   char             *idata,*odata;
526:   PetscTruth       flag;
527: 
529:   /* find the correct segment */
530:   AODataSegmentFind_Private(ao,name,segname,&flag,&key,&segment);
531:   if (!flag) SETERRQ2(PETSC_ERR_ARG_WRONG,"Cannot locate segment %s in key %s",segname,name);

533:   ierr  = PetscDataTypeGetSize(segment->datatype,&dsize);
534:   bs    = segment->bs;
535:   if (segment->datatype == PETSC_LOGICAL) nb = PetscBTLength(n); else nb = n;
536:   PetscMalloc((nb+1)*bs*dsize,&odata);
537:   idata = (char*)segment->data;
538:   for (i=0; i<n; i++) {
539:     PetscBitMemcpy(odata,i*bs,idata,keys[i]*bs,bs,segment->datatype);
540:   }
541:   *data = (void*)odata;
542:   return(0);
543: }

545: #undef __FUNCT__  
547: int AODataSegmentRestore_Basic(AOData aodata,char *name,char *segname,int n,int *keys,void **data)
548: {

552:   PetscFree(*data);
553:   return(0);
554: }

556: #undef __FUNCT__  
558: int AODataSegmentGetLocal_Basic(AOData ao,char *name,char *segname,int n,int *keys,void **data)
559: {
560:   int           ierr,*globals,*locals,bs;
561:   PetscDataType dtype;
562:   AODataKey     *key;
563:   PetscTruth    flag;

566:   AODataKeyFind_Private(ao,segname,&flag,&key);
567:   if (!flag) SETERRQ(PETSC_ERR_ARG_WRONG,"Segment does not have corresponding key");
568:   if (!key->ltog) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,"No local to global mapping set for key");
569:   AODataSegmentGetInfo(ao,name,segname,&bs,&dtype);
570:   if (dtype != PETSC_INT) SETERRQ(PETSC_ERR_ARG_WRONG,"Datatype of segment must be PETSC_INT");

572:   /* get the values in global indexing */
573:   AODataSegmentGet_Basic(ao,name,segname,n,keys,(void **)&globals);
574: 
575:   /* allocate space to store them in local indexing */
576:   PetscMalloc((n+1)*bs*sizeof(int),&locals);

578:   ISGlobalToLocalMappingApply(key->ltog,IS_GTOLM_MASK,n*bs,globals,PETSC_NULL,locals);

580:   AODataSegmentRestore_Basic(ao,name,segname,n,keys,(void **)&globals);

582:   *data = (void*)locals;
583:   return(0);
584: }

586: #undef __FUNCT__  
588: int AODataSegmentRestoreLocal_Basic(AOData aodata,char *name,char *segname,int n,int *keys,void **data)
589: {

593:   PetscFree(*data);
594:   return(0);
595: }

597: EXTERN int AOBasicGetIndices_Private(AO,int **,int **);

599: #undef __FUNCT__  
601: int AODataKeyRemap_Basic(AOData aodata,char *keyname,AO ao)
602: {
603:   int           ierr,*inew,k,*ii,nk,dsize,bs,nkb;
604:   char          *data,*tmpdata;
605:   AODataKey     *key;
606:   AODataSegment *seg;
607:   PetscTruth    flag,match;


611:   /* remap all the values in the segments that match the key */
612:   key = aodata->keys;
613:   while (key) {
614:     seg = key->segments;
615:     while (seg) {
616:       PetscStrcmp(seg->name,keyname,&match);
617:       if (!match) {
618:         seg = seg->next;
619:         continue;
620:       }
621:       if (seg->datatype != PETSC_INT) {
622:         SETERRQ(PETSC_ERR_ARG_WRONG,"Segment name same as key but not integer type");
623:       }
624:       nk   = seg->bs*key->N;
625:       ii   = (int*)seg->data;
626:       AOPetscToApplication(ao,nk,ii);
627:       seg  = seg->next;
628:     }
629:     key = key->next;
630:   }
631: 
632:   AOBasicGetIndices_Private(ao,&inew,PETSC_NULL);
633:   /* reorder in the arrays all the values for the key */
634:   AODataKeyFind_Private(aodata,keyname,&flag,&key);
635:   if (!flag) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Could not find key");
636:   nk  = key->N;
637:   seg = key->segments;
638:   while (seg) {
639:     ierr    = PetscDataTypeGetSize(seg->datatype,&dsize);
640:     bs      = seg->bs;
641:     data    = (char*)seg->data;
642:     if (seg->datatype == PETSC_LOGICAL) nkb = PetscBTLength(nk*bs); else nkb = nk*bs;
643:     PetscMalloc((nkb+1)*dsize,&tmpdata);

645:     for (k=0; k<nk; k++) {
646:       PetscBitMemcpy(tmpdata,inew[k]*bs,data,k*bs,bs,seg->datatype);
647:     }
648:     PetscMemcpy(data,tmpdata,nkb*dsize);
649:     PetscFree(tmpdata);
650:     seg = seg->next;
651:   }

653:   return(0);
654: }

656: #undef __FUNCT__  
658: int AODataKeyGetAdjacency_Basic(AOData aodata,char *keyname,Mat *adj)
659: {
660:   int           ierr,cnt,i,j,*jj,*ii,nlocal,n,*nb,bs,ls;
661:   AODataKey     *key;
662:   AODataSegment *seg;
663:   PetscTruth    flag;

666:   AODataSegmentFind_Private(aodata,keyname,keyname,&flag,&key,&seg);
667:   if (!flag) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Cannot locate key with neighbor segment");

669:   /*
670:      Get the beginning of the neighbor list for this processor 
671:   */
672:   bs     = seg->bs;
673:   nb     = (int*)seg->data;
674:   nb    += bs*key->rstart;
675:   nlocal = key->rend - key->rstart;
676:   n      = bs*key->N;

678:   /*
679:       Assemble the adjacency graph: first we determine total number of entries
680:   */
681:   cnt = 0;
682:   for (i=0; i<bs*nlocal; i++) {
683:     if (nb[i] >= 0) cnt++;
684:   }
685:   PetscMalloc((nlocal + 1)*sizeof(int),&ii);
686:   PetscMalloc((cnt+1)*sizeof(int),&jj);
687:   ii[0] = 0;
688:   cnt   = 0;
689:   for (i=0; i<nlocal; i++) {
690:     ls = 0;
691:     for (j=0; j<bs; j++) {
692:       if (nb[bs*i+j] >= 0) {
693:         jj[cnt++] = nb[bs*i+j];
694:         ls++;
695:       }
696:     }
697:     /* now sort the column indices for this row */
698:     PetscSortInt(ls,jj+cnt-ls);
699:     ii[i+1] = cnt;
700:   }

702:   MatCreateMPIAdj(aodata->comm,nlocal,n,ii,jj,PETSC_NULL,adj);
703:   return(0);
704: }

706: #undef __FUNCT__
708: int AODataSegmentPartition_Basic(AOData aodata,char *keyname,char *segname)
709: {
710:   int             ierr,size,bs,i,j,*idx,nc,*isc;
711:   AO              ao;
712:   AODataKey       *key,*keyseg;
713:   AODataSegment   *segment;
714:   PetscTruth      flag;


718:   AODataKeyFind_Private(aodata,segname,&flag,&keyseg);
719:   if (!flag) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Cannot locate segment as a key");
720:   PetscMalloc(keyseg->N*sizeof(int),&isc);
721:   PetscMemzero(isc,keyseg->N*sizeof(int));

723:   AODataSegmentFind_Private(aodata,keyname,segname,&flag,&key,&segment);
724:   if (flag != 1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Cannot locate segment");
725:   MPI_Comm_size(aodata->comm,&size);

727:   bs                = segment->bs;

729:   idx = (int*)segment->data;
730:   nc  = 0;
731:   for (i=0; i<size; i++) {
732:     for (j=bs*key->rowners[i]; j<bs*key->rowners[i+1]; j++) {
733:       if (!isc[idx[j]]) {
734:         isc[idx[j]] = ++nc;
735:       }
736:     }
737:   }
738:   for (i=0; i<keyseg->N; i++) {
739:     isc[i]--;
740:   }

742:   AOCreateBasic(aodata->comm,keyseg->nlocal,isc+keyseg->rstart,PETSC_NULL,&ao);
743:   PetscFree(isc);

745:   AODataKeyRemap(aodata,segname,ao);
746:   AODestroy(ao);
747:   return(0);
748: }

750: #undef __FUNCT__  
752: int AODataKeyGetActive_Basic(AOData aodata,char *name,char *segname,int n,int *keys,int wl,IS *is)
753: {
754:   int           ierr,i,cnt,*fnd,bs;
755:   AODataKey     *key;
756:   AODataSegment *segment;
757:   PetscBT       bt;
758:   PetscTruth    flag;

761:   AODataSegmentFind_Private(aodata,name,segname,&flag,&key,&segment);
762:   if (!flag) SETERRQ(1,"Cannot locate segment");

764:   bt = (PetscBT) segment->data;
765:   bs = segment->bs;

767:   if (wl >= bs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Bit field (wl) argument too large");

769:   /* count the active ones */
770:   cnt = 0;
771:   for (i=0; i<n; i++) {
772:     if (PetscBTLookup(bt,keys[i]*bs+wl)) {
773:       cnt++;
774:     }
775:   }

777:   PetscMalloc((cnt+1)*sizeof(int),&fnd);
778:   cnt = 0;
779:   for (i=0; i<n; i++) {
780:     if (PetscBTLookup(bt,keys[i]*bs+wl)) {
781:       fnd[cnt++] = keys[i];
782:     }
783:   }
784: 
785:   ISCreateGeneral(aodata->comm,cnt,fnd,is);
786:   PetscFree(fnd);
787:   return(0);
788: }

790: #undef __FUNCT__  
792: int AODataKeyGetActiveLocal_Basic(AOData aodata,char *name,char *segname,int n,int *keys,int wl,IS *is)
793: {
794:   int           ierr,i,cnt,*fnd,bs,*locals;
795:   AODataKey     *key;
796:   AODataSegment *segment;
797:   PetscBT       bt;
798:   PetscTruth    flag;

801:   AODataSegmentFind_Private(aodata,name,segname,&flag,&key,&segment);
802:   if (!flag) SETERRQ(1,"Cannot locate segment");

804:   bt = (PetscBT) segment->data;
805:   bs = segment->bs;

807:   if (wl >= bs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Bit field (wl) argument too large");

809:   /* count the active ones */
810:   cnt = 0;
811:   for (i=0; i<n; i++) {
812:     if (PetscBTLookup(bt,keys[i]*bs+wl)) {
813:       cnt++;
814:     }
815:   }

817:   PetscMalloc((cnt+1)*sizeof(int),&fnd);
818:   cnt = 0;
819:   for (i=0; i<n; i++) {
820:     if (PetscBTLookup(bt,keys[i]*bs+wl)) {
821:       fnd[cnt++] = keys[i];
822:     }
823:   }
824: 
825:   PetscMalloc((n+1)*sizeof(int),&locals);
826:   ISGlobalToLocalMappingApply(key->ltog,IS_GTOLM_MASK,cnt,fnd,PETSC_NULL,locals);
827:   PetscFree(fnd);
828:   ISCreateGeneral(aodata->comm,cnt,locals,is);
829:   PetscFree(locals);
830:   return(0);
831: }

833: EXTERN int AODataSegmentGetReduced_Basic(AOData,char *,char *,int,int*,IS *);
834: EXTERN int AODataPublish_Petsc(PetscObject);

836: static struct _AODataOps myops = {AODataSegmentAdd_Basic,
837:                                   AODataSegmentGet_Basic,
838:                                   AODataSegmentRestore_Basic,
839:                                   AODataSegmentGetLocal_Basic,
840:                                   AODataSegmentRestoreLocal_Basic,
841:                                   AODataSegmentGetReduced_Basic,
842:                                   AODataSegmentGetExtrema_Basic,
843:                                   AODataKeyRemap_Basic,
844:                                   AODataKeyGetAdjacency_Basic,
845:                                   AODataKeyGetActive_Basic,
846:                                   AODataKeyGetActiveLocal_Basic,
847:                                   AODataSegmentPartition_Basic,
848:                                   AODataKeyRemove_Basic,
849:                                   AODataSegmentRemove_Basic,
850:                                   AODataDestroy_Basic,
851:                                   AODataView_Basic};

853: #undef __FUNCT__  
855: /*@C
856:    AODataCreateBasic - Creates an AO datastructure.

858:    Collective on MPI_Comm

860:    Input Parameters:
861: .  comm  - MPI communicator that is to share AO

863:    Output Parameter:
864: .  aoout - the new database

866:    Options Database Keys:
867: +  -ao_data_view - Prints entire database at the conclusion of AODataSegmentAdd()
868: -  -ao_data_view_info - Prints info about database at the conclusion of AODataSegmentAdd()

870:    Level: intermediate

872: .keywords: AOData, create

874: .seealso: AODataSegmentAdd(), AODataDestroy()
875: @*/
876: int AODataCreateBasic(MPI_Comm comm,AOData *aoout)
877: {
878:   AOData    ao;
879:   int       ierr;

883:   *aoout = 0;
884: #ifndef PETSC_USE_DYNAMIC_LIBRARIES
885:   DMInitializePackage(PETSC_NULL);
886: #endif

888:   PetscHeaderCreate(ao,_p_AOData,struct _AODataOps,AODATA_COOKIE,AODATA_BASIC,"AOData",comm,AODataDestroy,AODataView);
889:   PetscLogObjectCreate(ao);
890:   PetscLogObjectMemory(ao,sizeof(struct _p_AOData));

892:   PetscMemcpy(ao->ops,&myops,sizeof(myops));
893:   ao->bops->publish = AODataPublish_Petsc;

895:   ao->nkeys        = 0;
896:   ao->keys         = 0;
897:   ao->datacomplete = 0;

899:   PetscPublishAll(ao);
900:   *aoout = ao;
901:   return(0);
902: }

904: #undef __FUNCT__  
906: /*@C
907:    AODataLoadBasic - Loads an AO database from a file.

909:    Collective on PetscViewer

911:    Input Parameters:
912: .  viewer - the binary file containing the data

914:    Output Parameter:
915: .  aoout - the new database

917:    Options Database Keys:
918: +  -ao_data_view - Prints entire database at the conclusion of AODataLoadBasic()
919: -  -ao_data_view_info - Prints info about database at the conclusion of AODataLoadBasic()

921:    Level: intermediate

923: .keywords: AOData, create, load, basic

925: .seealso: AODataSegmentAdd(), AODataDestroy(), AODataCreateBasic(), AODataView() 
926: @*/
927: int AODataLoadBasic(PetscViewer viewer,AOData *aoout)
928: {
929:   AOData        ao;
930:   int           fd,nkeys,i,ierr,dsize,j,size,rank,Nb;
931:   char          paddedname[256];
932:   AODataSegment *seg = 0;
933:   AODataKey     *key = 0;
934:   MPI_Comm      comm;
935:   PetscTruth    isbinary,flg1;

938:   *aoout = 0;
939:   PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_BINARY,&isbinary);
940:   if (!isbinary) {
941:     SETERRQ(PETSC_ERR_ARG_WRONG,"Viewer must be obtained from PetscViewerBinaryOpen()");
942:   }

944:   PetscObjectGetComm((PetscObject)viewer,&comm);
945:   MPI_Comm_size(comm,&size);
946:   MPI_Comm_rank(comm,&rank);

948:   PetscViewerBinaryGetDescriptor(viewer,&fd);

950:   /* read in number of segments */
951:   PetscBinaryRead(fd,&nkeys,1,PETSC_INT);

953:   PetscHeaderCreate(ao,_p_AOData,struct _AODataOps,AODATA_COOKIE,AODATA_BASIC,"AOData",comm,AODataDestroy,AODataView);
954:   PetscLogObjectCreate(ao);
955:   PetscLogObjectMemory(ao,sizeof(struct _p_AOData) + nkeys*sizeof(void *));

957:   PetscMemcpy(ao->ops,&myops,sizeof(myops));
958:   ao->bops->publish  = AODataPublish_Petsc;

960:   ao->nkeys      = nkeys;

962:   for (i=0; i<nkeys; i++) {
963:     if (!i) {
964:       ierr     = PetscNew(AODataKey,&key);
965:       ao->keys = key;
966:     } else {
967:       PetscNew(AODataKey,&key->next);
968:       key  = key->next;
969:     }
970:     key->ltog = 0;
971:     key->next = 0;

973:     /* read in key name */
974:     PetscBinaryRead(fd,paddedname,256,PETSC_CHAR);
975:     PetscStrallocpy(paddedname,&key->name);
976:     PetscBinaryRead(fd,&key->N,1,PETSC_INT);

978:     /* determine Nlocal and rowners for key */
979:     key->nlocal  = key->N/size + ((key->N % size) > rank);
980:     PetscMalloc((size+1)*sizeof(int),&key->rowners);
981:     MPI_Allgather(&key->nlocal,1,MPI_INT,key->rowners+1,1,MPI_INT,comm);
982:     key->rowners[0] = 0;
983:     for (j=2; j<=size; j++) {
984:       key->rowners[j] += key->rowners[j-1];
985:     }
986:     key->rstart        = key->rowners[rank];
987:     key->rend          = key->rowners[rank+1];

989:     /* loop key's segments, reading them in */
990:     PetscBinaryRead(fd,&key->nsegments,1,PETSC_INT);

992:     for (j=0; j<key->nsegments; j++) {
993:       if (!j) {
994:         ierr          = PetscNew(AODataSegment,&seg);
995:         key->segments = seg;
996:       } else {
997:         PetscNew(AODataSegment,&seg->next);
998:         seg  = seg->next;
999:       }

1001:       /* read in segment name */
1002:       PetscBinaryRead(fd,paddedname,256,PETSC_CHAR);
1003:       PetscStrallocpy(paddedname,&seg->name);

1005:       /* read in segment blocksize and datatype */
1006:       PetscBinaryRead(fd,&seg->bs,1,PETSC_INT);
1007:       PetscBinaryRead(fd,&seg->datatype,1,PETSC_INT);

1009:       /* allocate the space for the data */
1010:       PetscDataTypeGetSize(seg->datatype,&dsize);
1011:       if (seg->datatype == PETSC_LOGICAL) Nb = PetscBTLength(key->N*seg->bs); else Nb = key->N*seg->bs;
1012:       PetscMalloc(Nb*dsize,&seg->data);
1013:       /* read in the data */
1014:       PetscBinaryRead(fd,seg->data,key->N*seg->bs,seg->datatype);
1015:       seg->next = 0;
1016:     }
1017:   }
1018:   *aoout = ao;

1020:   PetscOptionsHasName(PETSC_NULL,"-ao_data_view",&flg1);
1021:   if (flg1) {
1022:     AODataView(ao,PETSC_VIEWER_STDOUT_(comm));
1023:   }
1024:   PetscOptionsHasName(PETSC_NULL,"-ao_data_view_info",&flg1);
1025:   if (flg1) {
1026:     PetscViewerPushFormat(PETSC_VIEWER_STDOUT_(comm),PETSC_VIEWER_ASCII_INFO);
1027:     AODataView(ao,PETSC_VIEWER_STDOUT_(comm));
1028:     PetscViewerPopFormat(PETSC_VIEWER_STDOUT_(comm));
1029:   }
1030:   PetscPublishAll(ao);
1031:   return(0);
1032: }