/* * artproc.c Copyright 1999 Christopher M Sedore. All Rights Reserved. * Please see the "COPYING" file for license details. * * These functions process articles that arrive via NNTP. */ #include "main.h" static int t=0,tt=0; static int maxArtAge=7*24*60*60; static time_t ct,oct; extern time_t curtime; void *incDist; struct artjob { struct myaiocb cb; struct article *art; u_quad_t key; TAILQ_ENTRY(artjob) list; char filterResponse[16]; }; TAILQ_HEAD(filtheadn,artjob) filterWaitQ; int filterWait=0; int fdExternalFilter; int externalFilterActive; extern int pceactive; /* * ArtArrived() does all the work necessary after an article has arrived * and has been written to disk. This includes flagging it for feeds, * writing it to the feed database, and waking feeds waiting for articles. */ void ArtArrived(struct artjob *aj) { t++; tt++; assert(aj!=NULL); assert(aj->art!=NULL); assert(aj->key!=0); FlagForSites(aj->art); FeedDBRecord(aj->art,aj->key); SiteConnWakeup(); } /* * ArtStoreDone() is a callback from ArtStore. This is called after the * article is asynchronously written to disk. It calls ArtArrived() to * flag the article for feeds, store it in the feed db, and kick any * feeds waiting for articles. After this completes, it frees * associated data. * * Future enhancements might include allowing nocheck feeds to reference * this buffer if they are waiting for articles so the article doesn't * have to be read back from disk (or, if the machine is really memory * rich, the articles could be put in a LRU cache to get dumped by * a caching mechanism). */ void ArtStoreDone(struct artjob *aj, int len) { assert(aj!=NULL); assert(aj->art!=NULL); assert(len==aj->art->len); ArtArrived(aj); FreeMatchData(aj->art); free(aj->art->bufp); free(aj->art); free(aj); } /* * ArtStore() is responsible for finding a store for the article, and * then setting up an (async) write with ArtStoreDone as a callback. * * This should be the last consumer in the chain, since it will always * reject or consume an article. */ int ArtStore(struct article *art) { struct artjob *aj; int x; x=FindStore(art); if (x<0) { printf("no store\n"); return CONSUMER_REJECT; } aj=malloc(sizeof(struct artjob)); bzero(aj,sizeof(struct artjob)); aj->art=art; aj->cb.cb.aio_buf=art->buf; aj->cb.cb.aio_nbytes=art->len; aj->cb.callback=(void *)ArtStoreDone; StoreCheckAndWrap(x); if (WriteToStore(x,&aj->key,(struct myaiocb *)aj)) { /* What should we do now?? We couldn't write it, but * that's not because of an article problem. Oh well, * log it and return an accepted code. */ syslog(LOG_NEWS|LOG_WARNING, "could not write %s to %u, store write failed.", aj->art->mid,x); FreeMatchData(aj->art); free(aj->art->bufp); free(aj->art); free(aj); } return CONSUMER_OK; } extern int artsFed; int oArtsFed=0; extern int pickups,totalpickedup; void PrintRecvStats() { char killsignal[MAX_PATH]; time(&ct); if (oct) { printf("%u per sec (%u/%u) - %u total, %i pce active\n",t/(ct-oct),t,ct-oct,tt,pceactive); fflush(stdout); t=0; sprintf(killsignal, "%s/KILL", GetConfigString("SignalDataPath")); if(access(killsignal, F_OK) == 0) { unlink(killsignal); Shutdown(); } } if (oArtsFed) { t=artsFed-oArtsFed; printf("%u per sec (%u/%u) - %u total\n",t/(ct-oct),t,ct-oct,artsFed); fflush(stdout); } if (pickups) { float f,f1,f2; f1=pickups; f2=totalpickedup; printf("%f avg picked up\n",f2/f1); } oArtsFed=artsFed; oct=ct; t=0; } static int xrefStrip=1; static char *pathEntry="static.maxwell.syr.edu!"; static char *commonpathEntry="common.maxwell.syr.edu!"; static char fullpathEntry[255]="static.maxwell.syr.edu!common.maxwell.syr.edu!"; static int pathEntryLength; static int fullpathEntryLength; /* * PathLineEdit() does two things: it removes the xref line (if configured * to do so, and the line exists), and it prepends our path entry on the * path line. We reserve 255 bytes ahead of each article so that we have * room to shift the leading part of the article backwards to make space * for our stamp. */ int PathLineEdit(struct article *art) { int b,l,c; char s[4096]; if (xrefStrip) { if (GetHeaderRangeFull(art,"Xref: ",&b,&l,&c)) { memmove(&art->buf[l],art->buf,b); art->len-=l; art->buf+=l; } } if (GetHeaderRange(art,"Path: ",&b,&l,&c)) { (void)strncpy((char *)s, (char *)art->buf+b, l); if (strstr(s,commonpathEntry)) { memmove(art->buf-pathEntryLength,art->buf,b); memcpy(&art->buf[b]-pathEntryLength,pathEntry, pathEntryLength); art->buf-=pathEntryLength; art->len+=pathEntryLength; } else { memmove(art->buf-fullpathEntryLength,art->buf,b); memcpy(&art->buf[b]-fullpathEntryLength,fullpathEntry, fullpathEntryLength); art->buf-=fullpathEntryLength; art->len+=fullpathEntryLength; } } else { return CONSUMER_REJECT; } return CONSUMER_IGNORE; } /* * Check the Date: header on the article to make sure it is within bounds. * If not, reject the article. */ int ArtDateCheck(struct article *art) { char bdate[128]; time_t t; if (GetHeader(art,"Date: ",bdate,127,art->len)) { art->artTime=t=parsedate(bdate,NULL); if (tart->len+5); cc->state="wfilt"; cc->cb.cb.aio_fildes=fdExternalFilter; cc->cb.cb.aio_buf=cc->art->buf; cc->cb.cb.aio_nbytes=cc->art->len+5; cc->cb.callback=(void *)ArtExternalFilterWriteDone; if (aio_write((struct aiocb *)cc)) { /* What should we do now?? We couldn't write it, but * that's not because of an article problem. Oh well, * log it and pretend it filtered OK. */ printf("write failed\n"); syslog(LOG_NEWS|LOG_WARNING, "could not write %s to exernal filter", cc->art->mid); cc->cb.cb.aio_fildes=cc->fd; FindConsumer(cc); ArtReadRespond(cc); NextIo(cc); } return; } void ArtExternalFilterWriteDone(struct context *cc, int result) { printf("wrote %i bytes to ext filt\n",result); if (result!=cc->art->len+5) { cc->cb.cb.aio_fildes=cc->fd; FindConsumer(cc); ArtReadRespond(cc); NextIo(cc); return; } cc->state="rfilt"; cc->cb.cb.aio_fildes=fdExternalFilter; cc->cb.cb.aio_buf=cc->art->filterResponse; cc->cb.cb.aio_nbytes=5; cc->cb.callback=(void *)ArtExternalFilterReadDone; if (aio_read((struct aiocb *)cc)) { /* What should we do now?? We couldn't read the result. * pretend it passed the filter OK. */ syslog(LOG_NEWS|LOG_WARNING, "could not read ext filter result for %s.", cc->art->mid); strcpy(cc->art->filterResponse,"335\r\n"); ArtExternalFilterReadDone(cc,5); } return; } void ArtExternalFilterReadDone(struct context *cc, int result) { cc->state="procart"; cc->cb.cb.aio_fildes=cc->fd; printf("read %s (%i) back from ext filt\n",cc->art->filterResponse,result); if (result!=5) { syslog(LOG_NEWS|LOG_WARNING, "external filter read result was %i, not 5", result); FindConsumer(cc); ArtReadRespond(cc); NextIo(cc); return; } if (atoi(cc->art->filterResponse)==335) { FindConsumer(cc); } else { cc->flags|=NNTP_BAD_ARTICLE; } ArtReadRespond(cc); NextIo(cc); return; } int ExternalFilterInit(char *cmdLine) { int mine,filter[2]; if (pipe(&filter[0])) { syslog(LOG_NEWS|LOG_WARNING, "external filter pipe creation failed"); return -1; } if (fork()) { fdExternalFilter=filter[0]; externalFilterActive=1; return 0; } else { int x=-1; char *argv[32]; while (++x<1024) { if (x==filter[1]) continue; if (x==2) continue; close(x); } dup2(filter[1],0); dup2(filter[1],1); x=0; argv[x]=strtok(cmdLine," "); while (argv[++x]=strtok(NULL," ")) { if (x>30) break; } if (execvp(argv[0],&argv[1])) { perror("execvp"); } exit(-1); } } int ArtProcInit() { char *filtercmd; RegisterArticleConsumer(PathLineEdit,0); RegisterArticleConsumer(ArtDateCheck,0); RegisterArticleConsumer(ArtStore,1); ScheduleCallback(5,9999999,PrintRecvStats,0); GetConfigInt("MaxArticleAge",&maxArtAge); filtercmd=GetConfigString("ExternalFilter"); if (filtercmd) { ExternalFilterInit(filtercmd); } pathEntry=(char *)GetConfigString("PathEntry"); commonpathEntry=(char *)GetConfigString("CommonPathEntry"); if (pathEntry==NULL) { fprintf(stderr,"PathEntry not set in globals section\n"); syslog(LOG_NEWS|LOG_ERR,"PathEntry not set in globals section"); exit(1); } if (commonpathEntry==NULL) { syslog(LOG_NEWS|LOG_INFO,"CommonPathEntry not set in globals section"); strcpy(fullpathEntry, pathEntry); } else sprintf(fullpathEntry,"%s%s", pathEntry, commonpathEntry); pathEntryLength=strlen(pathEntry); fullpathEntryLength=strlen(fullpathEntry); TAILQ_INIT(&filterWaitQ); if (GetConfigString("IncomingFilter")) { incDist=CompileMatchString(GetConfigString("IncomingFilter")); RegisterArticleConsumer(IncomingFilterCheck,0); } }