/* * sfeed.c Copyright 1999 Christopher M Sedore. All Rights Reserved. * Please see the "COPYING" file for license information. * * This file implements outgoing feeds. This feeder is quite * simple-minded in that it is lock step--it doesn't support * out of order responses. It should probably rewritten to * be smarter. All the lower-level io support is there, its just * simple matter of application logic :) * * Each destination for a feed is a "site". Sites have "connections" * that exchange nntp sequences with the site. */ #include "main.h" int artsFed=0; TAILQ_HEAD(feedtqh,ofeed) headArtWait; TAILQ_HEAD(livesitetqg,osite) activeSites; TAILQ_HEAD(connwhead,context) waitNewArticles; struct osite { char hostname[255]; int port; int num; int mark; int flags; int accept,refuse,reject,errors,flaggedfor,missing; int errc; int numConnections,connected; int maxQueue; void *dist; char *diststr; TAILQ_HEAD(chead,context) conns; TAILQ_ENTRY(osite) list; }; int FeedMain(struct context *); int FeedConnError(struct context *); /* * Wakeup sites waiting for new articles. This is called from the * callback that have just finished putting the article on disk. */ void SiteConnWakeup() { struct context *cc,*cn; for (cc=TAILQ_FIRST(&waitNewArticles);cc;cc=cn) { cn=TAILQ_NEXT(cc,list); assert(cc->callback!=NULL); // printf("wakeup site\n"); TAILQ_REMOVE(&waitNewArticles,cc,list); if (cc->flags & NNTP_STREAMING) { if (StreamFeed(cc)) { NextIo(cc); return; } } else { if (IhaveFeed(cc)) { NextIo(cc); return; } } } } /* * SiteFind() * * Find a site by name. */ struct osite * SiteFind(char *name) { struct osite *s; for (s=TAILQ_FIRST(&activeSites);s;s=TAILQ_NEXT(s,list)) { if (!strcasecmp(name,s->hostname)) return s; } return NULL; } /* * SiteMarkAll() * * This sets a flag on each site. The flag was intended to be used to * determine which sites had been "recreated" by a re-read of the config * files. Any site still marked is one that wasn't in the edited files * and needs to be deleted. */ struct osite * SiteMarkAll() { struct osite *s; for (s=TAILQ_FIRST(&activeSites);s;s=TAILQ_NEXT(s,list)) { s->mark=1; } return NULL; } struct osite * SiteDeleteMarked() { struct osite *s,*n; for (s=TAILQ_FIRST(&activeSites);s;s=n) { n=TAILQ_NEXT(s,list); if (s->mark) SiteDelete(s->hostname); } return NULL; } /* * SiteCreate() * * Create a new outgoing site. */ int SiteCreate(int num,char *hostname) { struct osite *s; int destinationport; if (SiteFind(hostname)) { return -1; } s=malloc(sizeof(struct osite)); bzero(s,sizeof(struct osite)); strcpy(s->hostname,hostname); s->dist=NULL; s->diststr=NULL; s->num=num; GetConfigInt("DestinationPort",&destinationport); if(destinationport==0 || destinationport<1) { printf("No DestinationPort defined - defaulting to 119"); destinationport=119; } s->port=destinationport; if (GetConfigInt("DefaultMaxQueue",&s->maxQueue)==NULL) { s->maxQueue=2000000; } if (s->maxQueue<5) { s->maxQueue=5; } DnsAddName(hostname); TAILQ_INIT(&s->conns); TAILQ_INSERT_TAIL(&activeSites,s,list); return 0; } int SiteDelete(char *name) { struct osite *s; s=SiteFind(name); if (!s) return -1; s->flags|=NNTP_FEED_CLOSE; if (s->numConnections) { SiteSet(name,"connections","0"); } else { TAILQ_REMOVE(&activeSites,s,list); free(s); } return 0; } /* * SiteSet() * * Set a parameter for site. */ int SiteSet(char *name,char *item,char *value) { struct osite *s; struct context *c,*cn; s=SiteFind(name); if (s==NULL) return -1; if (!strcasecmp(item,"dist")) { char *t; if (s->diststr) { if (!strcmp(s->diststr,value)) { return 0; } free(s->diststr); } s->diststr=strdup(value); s->dist=CompileMatchString(value); return 0; } if (!strcasecmp(item,"port")) { s->port=atoi(value); return 0; } if (!strcasecmp(item,"connections")) { int nc=atoi(value); if ((nc<0) || (nc>20)) return -1; if (ncnumConnections) { while ((c=TAILQ_FIRST(&s->conns)) && (nc-s->numConnections)) { TAILQ_REMOVE(&s->conns,c,clist); c->flags|=NNTP_FEED_CLOSE; nc--; } s->numConnections=nc; } else { while (nc>s->numConnections) { StartConn(s); s->numConnections++; } } return 0; } if (!strcasecmp(item,"maxqueue")) { unsigned int mq=atoi(value); if (mq<5) mq=5; s->maxQueue=mq; return 0; } return -1; } /* * FlagForSites() * * Given an article, check the distribution for each site set a bit flag * for it if it matches (no distribution set == send everything). */ int FlagForSites(struct article *art) { struct osite *s; int dist; for (s=TAILQ_FIRST(&activeSites);s;s=TAILQ_NEXT(s,list)) { dist=1; if (s->dist) { dist=CheckMatch(art,s->dist); /* if ((dist) && ((MatchHeaderSpec("Newsgroups:","*binaries*",art)) || (art->len>10000))) { abort(); } */ } if (dist) { FeedFlagSite(s->num,art->distflags); s->flaggedfor++; } } } /* * StartConn() * * Create a new connection to a site. */ int StartConn(struct osite *s) { int c; struct context *oc; int addr; addr=DnsGetAddr(s->hostname); if (addr==0) { ScheduleCallback(30,0,StartConn,s); return; } // printf("addr=%s\n",inet_ntoa(addr)); c=AsyncConnect(addr,s->port); if (c<0) { ScheduleCallback(30,0,StartConn,s); return; } // printf("connecting to %s on %i\n",s->hostname,c); oc=malloc(sizeof(struct context)); bzero(oc,sizeof(struct context)); oc->site=s; oc->bufsz=8192; oc->obufsz=8192; oc->cb.cb.aio_fildes=c; oc->fd=c; { char fname[64]; sprintf(fname,"/tmp/%u.%u",getpid(),c); // oc->cb.logfd=open(fname,O_CREAT|O_TRUNC|O_RDWR); } oc->callback=FeedMain; oc->flags|=NNTP_FEED_START; oc->bp=oc->buf; oc->errorCallback=(int (*)(void *))FeedConnError; oc->state="connecting"; TAILQ_INSERT_TAIL(&s->conns,oc,clist); NextIo(oc); } /* * StreamFeed() * * Send a bunch of "check " commands. */ int StreamFeed(struct context *oc) { int x,y; char *s,*b; x=FeedDBGetArticles(oc->site->num,32,&oc->pae,oc->site->maxQueue); if (x<0) { return x; // XXX XXX XXX } if (!x) { oc->state="artwait"; TAILQ_INSERT_HEAD(&waitNewArticles,oc,list); return x; } oc->state="check"; y=0; s=&oc->obuf[oc->obuflen]; while ((yobufobufsz-(MAX_ARTICLEID+8))) { *s='c'; s++; *s='h'; s++; *s='e'; s++; *s='c'; s++; *s='k'; s++; *s=' '; s++; b=oc->pae[y]->mid; while (*b) { *s=*b; s++; b++; } *s='\r'; s++; *s='\n'; s++; UpdatePrecommit(HashArticleID(oc->pae[y]->mid,0),oc->pae[y]); y++; oc->outstanding++; } if ((y) && (ypae[y-1]); assert(s-oc->obuf<=oc->obufsz); oc->obuflen=s-oc->obuf; oc->lastbatch=y; return x; } /* * IhaveFeed() * * Send an "ihave " */ int IhaveFeed(struct context *oc) { int x,y; char *s,*b; struct artent ae; assert(oc->obuflen==0); x=FeedDBGetArticles(oc->site->num,1,&oc->pae,oc->site->maxQueue); if (x<0) { return x; // XXX XXX XXX } if (!x) { oc->state="artwait"; TAILQ_INSERT_HEAD(&waitNewArticles,oc,list); return x; } // UpdatePrecommit(HashArticleID(oc->pae[0]->mid,0),oc->pae[0]); oc->state="ihave"; y=0; s=&oc->obuf[oc->obuflen]; while ((yobufpae[y]->mid; while (*b) { *s=*b; s++; b++; } *s='\r'; s++; *s='\n'; s++; y++; oc->outstanding++; } oc->obuflen=s-oc->obuf; return x; } /* * TakethisTransmit() * * We got a "238 "--try to send the associated article. */ int TakethisTransmit(struct context *oc) { int x,y; struct artent ae; if (GetArticleID(oc)) { return -1; } y=0; if (!GetPrecommitArtEnt(oc->mid64,&ae)) { oc->site->missing++; return -1; } // assert(FeedCheckFlags(oc->site->num,ae.siteflags)); sprintf(&oc->obuf[oc->obuflen],"takethis %s\r\n",oc->mid); x=oc->obuflen; oc->obuflen+=strlen(&oc->obuf[oc->obuflen]); if (CopyArt(oc,&ae)) { oc->site->missing++; oc->obuflen=x; return -1; } artsFed++; oc->state="takethis"; oc->outstanding++; return 0; } /* * IhaveTransmit() * * We got a "335"--send the article we recently IHAVE'd */ void IhaveTransmit(struct context *oc) { assert(oc->obuflen==0); oc->outstanding++; if (CopyArt(oc,oc->pae[0])) { oc->site->missing++; sprintf(&oc->obuf[oc->obuflen],".\r\n"); oc->obuflen+=strlen(&oc->obuf[oc->obuflen]); NextIo(oc); return; } artsFed++; oc->state="ihavesnd"; } /* * FeedConnError() * * Callback for when an "error" occurs. This might be an actual error * (reset connection, etc), or a connection gracefully closed by the * remote host. */ int FeedConnError(struct context *oc) { TAILQ_REMOVE(&oc->site->conns,oc,clist); if (!(oc->flags & NNTP_FEED_START)) oc->site->connected--; if (oc->site->flags & NNTP_FEED_CLOSE) { TAILQ_REMOVE(&oc->site->conns,oc,clist); if (TAILQ_FIRST(&oc->site->conns)==NULL) { free(oc->site); } return; } if (!(oc->flags & NNTP_FEED_CLOSE)) { // printf("reconnecting\n"); ScheduleCallback(10,0,StartConn,oc->site); } TAILQ_REMOVE(&oc->site->conns,oc,clist); } /* * FeedMain() * * The main upcall. This receives responses from the remote site, * executes commands, and does basic checking to see if we need to * send another command. */ int FeedMain(struct context *oc) { char *s; int ret=0; if (oc->flags & NNTP_FEED_START) { /* our read may come back empty because we were awoken * when the connection completed, but before any data * could be sent */ if (oc->buflen==0) { if (!oc->cmdcount) { oc->connerror=0; oc->cmdcount++; return; } FLAG_FOR_CONNECTION_CLOSE(oc); return 0; } ret=atoi(oc->buf); if ((ret>199) && (ret<210)) { int val=0; if (ioctl(oc->fd,FIONBIO,&val)) { perror("ioctl"); } oc->state="mode"; strcpy(oc->obuf,"mode stream\r\n"); oc->obuflen+=strlen(oc->obuf); oc->flags&=~NNTP_FEED_START; oc->buflen=0; oc->site->connected++; } else { printf("oc->buf=%s,oc->bp=%s,oc->buflen=%i\n",oc->buf,oc->bp,oc->buflen); FLAG_FOR_CONNECTION_CLOSE(oc); } return 0; } oc->state="loop"; if (oc->buflen) { switch (atoi(oc->bp)) { case 431 : case 435 : case 438 : { oc->site->refuse++; oc->outstanding--; } break; case 238 : { if (!TakethisTransmit(oc)) { ret=1; } oc->outstanding--; } break; case 235 : case 239 : { oc->site->accept++; oc->outstanding--; } break; case 437: case 439 : { oc->site->reject++; oc->outstanding--; } break; case 335 : { IhaveTransmit(oc); oc->outstanding--; ret=1; } break; case 203 : { oc->flags|=NNTP_STREAMING; } break; case 205: { FLAG_FOR_CONNECTION_CLOSE_AOP(oc); } break; case 200 : case 201 : break; default: { oc->site->errors++; oc->connerror++; if ((!(oc->flags & NNTP_STREAMING)) && (oc->connerror>1)) { FLAG_FOR_CONNECTION_CLOSE(oc); return 0; } if (oc->outstanding) oc->outstanding--; if (oc->connerror>5) { FLAG_FOR_CONNECTION_CLOSE(oc); return 0; } printf("err: %s\n",oc->bp); } break; } //printf("len=%i,code=%i,outs=%u\n",oc->buflen,atoi(oc->bp),oc->outstanding); //printf("==>%s",oc->bp); if (ret) { return ret; } } // printf("outstanding=%i\n",oc->outstanding); if (!oc->outstanding) { if (oc->flags & NNTP_STREAMING) { if (!StreamFeed(oc)) { return 1; } } else { if (!IhaveFeed(oc)) { return 1; } } } return 0; } void NntpFeedOut(struct context *cc) { char *s,*p,*v; s=strtok(cc->bp+7,"\t "); if (!*s) { goto error; } if (!strcasecmp(s,"create")) { s=strtok(NULL,"\t "); p=strtok(NULL,"\t "); if ((!p) || (!s) || (atoi(s)==0)) goto error; if (SiteCreate(atoi(s),p)==-1) { strcpy(&cc->obuf[cc->obuflen],"500 failed\r\n"); cc->obuflen+=strlen(&cc->obuf[cc->obuflen]); } else { sprintf(&cc->obuf[cc->obuflen],"%s created as %i\r\n", p,atoi(s)); cc->obuflen+=strlen(&cc->obuf[cc->obuflen]); } return; } if (!strcasecmp(s,"set")) { s=strtok(NULL,"\t "); p=strtok(NULL,"\t "); v=strtok(NULL,"\n"); if (SiteSet(s,p,v)==-1) { strcpy(&cc->obuf[cc->obuflen],"500 failed\r\n"); cc->obuflen+=strlen(&cc->obuf[cc->obuflen]); } else { sprintf(&cc->obuf[cc->obuflen],"%s set to %s for %s\r\n", p,v,s); cc->obuflen+=strlen(&cc->obuf[cc->obuflen]); } return; } error: strcpy(&cc->obuf[cc->obuflen],"500 feedout [create num name]|[set name param value]\r\n"); cc->obuflen+=strlen(&cc->obuf[cc->obuflen]); return; } void PrintStats() { struct osite *s; struct context *c; for (s=TAILQ_FIRST(&activeSites);s;s=TAILQ_NEXT(s,list)) { printf("%s - accepted %u refused %u rejected %u errors %u flagged %u\n\t\tmissing %u connected %u\n", s->hostname,s->accept,s->refuse,s->reject,s->errors,s->flaggedfor, s->missing,s->connected); for (c=TAILQ_FIRST(&s->conns);c;c=TAILQ_NEXT(c,clist)) { printf("\tconn state=%s (%s) (%i)\n" ,c->state,c->cb.state,c->outstanding); } } } void FeedInit() { TAILQ_INIT(&waitNewArticles); TAILQ_INIT(&activeSites); ScheduleCallback(30,9999999,PrintStats,0); }