/* * SERVER.C - /news/dserver.hosts management * * (c)Copyright 1998, Matthew Dillon, All Rights Reserved. Refer to * the COPYRIGHT file in the base directory of this distribution * for specific rights granted. */ #include "defs.h" Prototype void CheckServerConfig(time_t t, int force); Prototype void LogServerInfo(Connection *conn, int fd); Prototype void NNArticleRetrieveByMessageId(Connection *conn, const char *msgid, int grouphint); Prototype void NNServerIdle(Connection *conn); Prototype void NNServerTerminate(Connection *conn); Prototype void NNFinishSReq(Connection *conn, const char *ctl, int requeue); Prototype void NNServerRequest(Connection *conn, const char *grp, const char *msgid, FILE *cache, int req); Prototype int ServersTerminated; Prototype int NReadServers; Prototype int NReadServAct; Prototype int NWriteServers; Prototype int NWriteServAct; void NNServerStart(Connection *conn); void QueueServerRequests(void); int queueServerRequest(ServReq *sreq, int type, int maxq); void queueDesc(ServReq *sreq, ForkDesc *desc); int requeueServerRequest(Connection *conn, ServReq *sreq, int type, int maxq); int AddServer(char *bindinfo, int txbufsize, int rxbufsize, const char *host, int type, int port, int flags, int pri, char *groups, const char *localspool, HashFeed *hashFeed); void NNServerPrime1(Connection *conn); void NNServerPrime2(Connection *conn); void NNServerPrime3(Connection *conn); void NNServerPrime4(Connection *conn); void NNServerConnect(Connection *conn); ServReq *SReadBase; ServReq **PSRead = &SReadBase; ServReq *SWriteBase; ServReq **PSWrite = &SWriteBase; int NReadServers; int NReadServAct; int NWriteServers; int NWriteServAct; int ServersTerminated; /* * CheckServerConfig() - determine if configuration file has changed and * resynchronize with servers list. * * NOTE: a server cannot be destroyed until its current * pending request completes, but no new requests will * be queued to it during that time. */ void setServerMaybeCloseFlag(ForkDesc *desc) { Connection *conn = desc->d_Data; conn->co_Flags |= COF_MAYCLOSESRV; if (conn->co_Func == NNServerIdle) /* wakeup idle server */ FD_SET(desc->d_Fd, &RFds); } void setServerClosedRemovals(ForkDesc *desc) { Connection *conn; if ((conn = desc->d_Data) != NULL) { if (conn->co_Flags & COF_MAYCLOSESRV) { NNServerTerminate(conn); } } } void CheckServerConfig(time_t t, int force) { static struct stat St = { 0 }; struct stat st; const char *path = PatLibExpand(DServerHostsPat); /* * Assuming we can stat the file, if we haven't checked the server config * before or the server config time is different from when we last checked * AND the server config time is not now (so we don't read the file while * someone is writing it), then read the configuration file (again). * I've also got some reverse-time-index protection in there. * * NOTE: path only valid until next Pat*Expand() call, so be careful * with it's use. */ if (force || (stat(path, &st) == 0 && (St.st_mode == 0 || st.st_mtime != St.st_mtime) && ((long)(t - st.st_mtime) > 2 || (long)(t - st.st_mtime) < 0)) ) { FILE *fi; if (force) stat(path, &st); memcpy(&St, &st, sizeof(St)); if ((fi = fopen(path, "r")) != NULL) { char buf[PATH_MAX]; ServersTerminated = 0; ScanThreads(THREAD_SPOOL, setServerMaybeCloseFlag); ScanThreads(THREAD_POST, setServerMaybeCloseFlag); while (fgets(buf, sizeof(buf), fi) != NULL) { char *host = strtok(buf, " \t\n"); char *flags = ""; char c; int addAsServer = 0; int addAsPoster = 0; int port = 119; int nflags = 0; int npri = 0; char *option; char *bindinfo = NULL; char *groups = NULL; char *localspool = NULL; HashFeed hash = { 0 }; int txbufsize = 0; int rxbufsize = 0; if (host == NULL || host[0] == '#') continue; if ((flags = strtok(NULL, " \t\n")) == NULL) flags = ""; while ((c = *flags) != 0) { ForkDesc *desc = NULL; switch(c) { case 'p': /* port */ port = strtol(flags + 1, NULL, 0); if (port == 0) port = 119; break; case 's': /* spool */ npri = strtol(flags + 1, NULL, 10); if ((desc = FindThreadId(THREAD_SPOOL, host)) != NULL) { Connection *conn = desc->d_Data; conn->co_Flags &= ~COF_MAYCLOSESRV; desc->d_Pri = npri; } else { addAsServer = 1; } break; case 'M': nflags |= COF_MODEREADER; break; case 'R': nflags |= COF_READONLY; break; case 'o': /* outgoing (post) */ npri = strtol(flags + 1, NULL, 10); if ((desc = FindThreadId(THREAD_POST, host)) != NULL) { Connection *conn = desc->d_Data; conn->co_Flags &= ~COF_MAYCLOSESRV; desc->d_Pri = npri; } else { addAsPoster = 1; } break; default: /* * ignore unknown flag or number */ break; } ++flags; } while ((option = strtok(NULL, " \t\n")) != NULL) { if (strncmp(option, "bind=", 5) == 0) { bindinfo = option + 5; } else if (strncmp(option, "txbufsize=", 10) == 0) { txbufsize = strtol(option + 10, NULL, 0); } else if (strncmp(option, "rxbufsize=", 10) == 0) { rxbufsize = strtol(option + 10, NULL, 0); } else if (strncmp(option, "groups=", 7) == 0) { groups = option + 7; } else if (strncmp(option, "localspool=", 11) == 0) { localspool = option + 11; } else if (strncmp(option, "hash=", 5) == 0) { char *p; option += 5; if ((p = strchr(option, '-')) != NULL) { hash.hf_Begin = strtol(option, NULL, 0); hash.hf_End = strtol(++p, NULL, 0); if ((p = strchr(option, '/')) != NULL) hash.hf_Mod = strtol(++p, NULL, 0); } else { hash.hf_Begin = hash.hf_End = strtol(option, NULL, 0); if ((p = strchr(option, '/')) != NULL) hash.hf_Mod = strtol(++p, NULL, 0); } } else { logit(LOG_ERR, "Unknown option '%s' - ignoring", option); } } if (addAsServer) { if (AddServer(bindinfo, txbufsize, rxbufsize, host, THREAD_SPOOL, port, nflags, npri, groups, localspool, &hash) == 0) { ++NReadServers; ++NReadServAct; } else { ++ServersTerminated; } } if (addAsPoster) { if (AddServer(bindinfo, txbufsize, rxbufsize, host, THREAD_POST, port, nflags, npri, groups, localspool, &hash) == 0) { ++NWriteServers; ++NWriteServAct; } else { ++ServersTerminated; } } } fclose(fi); ScanThreads(THREAD_SPOOL, setServerClosedRemovals); ScanThreads(THREAD_POST, setServerClosedRemovals); } } } /* * Log some spool server stats on an hourly basis */ void LogServerInfo(Connection *conn, int fd) { time_t now = time(NULL); if (!conn->co_LastServerLog) conn->co_LastServerLog = now; if (!conn->co_ServerByteCount || conn->co_LastServerLog + 300 > now) return; logit(LOG_INFO, "info server %s articles=%ld bytes=%ld", (conn->co_Desc->d_Id ? conn->co_Desc->d_Id : "UNKNOWN"), conn->co_ServerArticleCount, conn->co_ServerByteCount ); { DnsRes dr; dr.dr_ResultFlags = DR_SERVER_STATS; dr.dr_ArtCount = conn->co_ServerArticleCount; dr.dr_ByteCount = conn->co_ServerByteCount; SendMsg(fd, -1, &dr); } conn->co_ServerByteCount = 0; conn->co_ServerArticleCount = 0; conn->co_LastServerLog = now; } GroupList * makeGroupList(Connection *conn, char *groups) { GroupList *gr = NULL; GroupList *grStart = NULL; char *p; if (groups == NULL) return(NULL); for (p = strtok(groups, ","); p != NULL; p = strtok(NULL, ",")) { GroupList *g = zalloc(&conn->co_MemPool, sizeof(GroupList)); g->group = zallocStr(&conn->co_MemPool, p); g->next = NULL; if (gr != NULL) gr->next = g; gr = g; if (grStart == NULL) grStart = gr; } return(grStart); } int AddServer(char *bindinfo, int txbufsize, int rxbufsize, const char *host, int type, int port, int flags, int pri, char *groups, const char *localspool, HashFeed *hashFeed) { ForkDesc *desc; int fd; struct sockaddr_in sin; Connection *conn; /* * connect() to the host (use asynchronous connect()) */ bzero(&sin, sizeof(sin)); { struct hostent *he = gethostbyname(host); if (he != NULL) { sin.sin_family = he->h_addrtype; memmove(&sin.sin_addr, he->h_addr, he->h_length); } else if (strtol(host, NULL, 0) != 0) { sin.sin_family = AF_INET; sin.sin_addr.s_addr = inet_addr(host); } else { logit(LOG_ERR, "hostname lookup failure: %s\n", host); return(-1); } } sin.sin_port = htons(port); if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { logit(LOG_ERR, "socket() call failed on host %s\n", host); return(-1); } if (bindinfo != NULL) { struct hostent *he; struct sockaddr_in lsin; bzero(&lsin, sizeof(lsin)); if ((he = gethostbyname(bindinfo)) != NULL) { lsin.sin_addr = *(struct in_addr *)he->h_addr; } else { lsin.sin_addr.s_addr = inet_addr(bindinfo); if (lsin.sin_addr.s_addr == INADDR_NONE) { logit(LOG_ERR, "Unknown host for bindhost option: %s\n", bindinfo); return(-1); } } lsin.sin_family = AF_INET; lsin.sin_port = 0; if (bind(fd, (struct sockaddr *) &lsin, sizeof(lsin)) < 0) { logit(LOG_ERR, "failed to bind source address %s (%s)", bindinfo, strerror(errno)); return(-1); } } { int on = 1; setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&on, sizeof(on)); if (txbufsize > 0) setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *)&txbufsize, sizeof(int)); if (rxbufsize > 0) setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *)&rxbufsize, sizeof(int)); } fcntl(fd, F_SETFL, O_NONBLOCK); /* asynchronous connect() */ errno = 0; if (connect(fd, (struct sockaddr *)&sin, sizeof(sin)) < 0) { if (errno != EINPROGRESS) { close(fd); logit(LOG_ERR, "connect() call failed on host %s: %s\n", host, strerror(errno)); return(-1); } } /* * add thread. Preset d_Count to LIMSIZE to prevent the server * from being allocated for client requests until we know * we have a good connection. */ desc = AddThread(host, fd, -1, type, -1, pri); FD_SET(desc->d_Fd, &RFds); conn = InitConnection(desc, NULL); conn->co_Flags |= flags; conn->co_Flags |= COF_INPROGRESS | COF_ININIT; desc->d_Count = THREAD_LIMSIZE; conn->co_Auth.dr_ReaderDef = NULL; conn->co_ListCacheGroups = makeGroupList(conn, groups); bcopy(hashFeed, &conn->co_RequestHash, sizeof(HashFeed)); if (localspool != NULL) desc->d_LocalSpool = zallocStr(&conn->co_MemPool, localspool); NNServerConnect(conn); if (DebugOpt) printf("Added Server (type=%d fd=%d pid=%d)\n", type, fd, (int)getpid()); return(0); } /* * QUEUESERVERREADREQUEST() - move requests to the appropriate server. * * New article fetch and post requests are placed on the SRead/SWrite * lists. This function moves as many of those requests as possible to * actual spool & post server queues. * * Even though a server can only handle one request at a time, we * allow up to N requests (THREAD_QSIZE/PSIZE) to be queued to an * actual server in order to judge load. If the load exceeds the * queue limit, FindLeastUsedThread will start queueing to higher priority * servers. */ void QueueServerRequests(void) { ServReq *sreq; while ((sreq = SReadBase) != NULL) { if ((SReadBase = sreq->sr_Next) == NULL) PSRead = &SReadBase; sreq->sr_Next = NULL; if (queueServerRequest(sreq, THREAD_SPOOL, THREAD_QSIZE) < 0) { *PSRead = sreq; PSRead = &sreq->sr_Next; break; } } while ((sreq = SWriteBase) != NULL) { if ((SWriteBase = sreq->sr_Next) == NULL) PSWrite = &SWriteBase; sreq->sr_Next = NULL; if (queueServerRequest(sreq, THREAD_POST, THREAD_PSIZE) < 0) { *PSWrite = sreq; PSWrite = &sreq->sr_Next; break; } } } int cbQueueReqFindThread(void *cbData, void *data) { int result; ServReq *sreq = cbData; Connection *conn = data; if (conn->co_RequestHash.hf_Mod != 0 && !HashFeedMatch(&conn->co_RequestHash, quickhash(sreq->sr_MsgId))) return(0); if (sreq->sr_Group == NULL || conn->co_ListCacheGroups == NULL) result = 1; else result = GroupFindWild(sreq->sr_Group, conn->co_ListCacheGroups); return(result); } /* * queueServerRequest() - find best server to queue request to, return 0 * on success, -1 if the request could not be queued. * * On success, the request will have been properly * queued. */ int queueServerRequest(ServReq *sreq, int type, int maxq) { ForkDesc *desc; static int Randex1 = -1; desc = FindLeastUsedThread(type, maxq, 0, &Randex1, -1, cbQueueReqFindThread, sreq); if (desc != NULL) { sreq->sr_NoPass = desc->d_Fd; queueDesc(sreq, desc); return(0); } logit(LOG_ERR, "Unable to find any spools (or spool threads too busy) for request %s ", sreq->sr_MsgId); return(-1); } void queueDesc(ServReq *sreq, ForkDesc *desc) { Connection *conn = desc->d_Data; ServReq **psreq = &conn->co_SReq; while (*psreq) psreq = &(*psreq)->sr_Next; *psreq = sreq; sreq->sr_SConn = conn; ++desc->d_Count; if (desc->d_Type == THREAD_SPOOL) ++NReadServAct; else ++NWriteServAct; sreq->sr_Rolodex = desc->d_Fd; if (DebugOpt) printf("Request %s queued to fd %d\n", sreq->sr_MsgId, desc->d_Fd); /* * If server was idle, kick it. */ if (psreq == &conn->co_SReq) NNServerIdle(conn); } /* * requeueServerRequest() - if request failed with previous server, * requeue for next server. * * This function requeues a request to another server given the previous * server. It will attempt to send the request to all servers at the * current priority and then will go to the next priority level, and * so on, return -1 if the request could not be queued. * * When requeueing a request, queue limits are relaxed in order to * ensure that the request does not skip to higher priority queues * due to high load. */ int requeueServerRequest(Connection *conn, ServReq *sreq, int type, int maxq) { static int Randex2; ForkDesc *desc; /* * Find the next server */ desc = FindLeastUsedThread( type, maxq * 2, conn->co_Desc->d_Pri, &sreq->sr_Rolodex, sreq->sr_NoPass, cbQueueReqFindThread, sreq ); if (desc == NULL) { desc = FindLeastUsedThread( type, maxq * 2, conn->co_Desc->d_Pri + 1, &Randex2, -1, cbQueueReqFindThread, sreq ); if (desc) { sreq->sr_NoPass = desc->d_Fd; if (DebugOpt) printf("Requeue %s to nextpri %d\n", sreq->sr_MsgId, desc->d_Pri); } else { if (DebugOpt) printf("Requeue %s failed\n", sreq->sr_MsgId); } } else { if (DebugOpt) printf("Requeue %s to priority %d\n", sreq->sr_MsgId, desc->d_Pri); } if (desc) { queueDesc(sreq, desc); return(0); } return(-1); } /* * FreeSReq() */ void FreeSReq(ServReq *sreq) { /* * If cache file write in progress, abort it. allow the * fclose() to release the lock after the truncation. * * We NULL the FILE * out even though we free the structure * so potential memory corruption doesn't mess with random * (future) files. */ if (sreq->sr_Cache != NULL) { fflush(sreq->sr_Cache); AbortCache(fileno(sreq->sr_Cache), sreq->sr_MsgId, 0); fclose(sreq->sr_Cache); sreq->sr_Cache = NULL; } zfreeStr(&SysMemPool, &sreq->sr_Group); zfreeStr(&SysMemPool, &sreq->sr_MsgId); zfree(&SysMemPool, sreq, sizeof(ServReq)); } /* * NNServerRequest() * * NOTE: don't get confused by co_SReq, it serves two functions. It * placeholds a single request from a client in client Connection * structures, which this call handles, and placeholds MULTIPLE client * requests in server Connection structures. */ void NNServerRequest(Connection *conn, const char *grp, const char *msgid, FILE *cache, int req) { ServReq *sreq = zalloc(&SysMemPool, sizeof(ServReq)); sreq->sr_CConn = conn; sreq->sr_SConn = NULL; /* for clarity: not assigned to server yet */ sreq->sr_Time = time(NULL); sreq->sr_Group = grp ? zallocStr(&SysMemPool, grp) : NULL; sreq->sr_MsgId = msgid ? zallocStr(&SysMemPool, msgid) : NULL; sreq->sr_Cache = cache; /* may be NULL */ conn->co_SReq = sreq; /* client has active sreq */ FD_CLR(conn->co_Desc->d_Fd, &RFds); if (req == SREQ_RETRIEVE) { *PSRead = sreq; PSRead = &sreq->sr_Next; } else if (req == SREQ_POST) { *PSWrite = sreq; PSWrite = &sreq->sr_Next; } QueueServerRequests(); } /* * NNFinishSReq() - finish up an SReq, but requeue to a new server if requested * (i.e. article not found on old server). */ void NNFinishSReq(Connection *conn, const char *ctl, int requeue) { ServReq *sreq; if ((sreq = conn->co_SReq)) { /* * dequeue request from server side */ conn->co_SReq = sreq->sr_Next; sreq->sr_Next = NULL; if (conn->co_Desc) { --conn->co_Desc->d_Count; if (conn->co_Desc->d_Type == THREAD_POST) --NWriteServAct; else --NReadServAct; } conn->co_Flags &= ~COF_INPROGRESS; /* * if requeue requested, attempt to requeue. requeueServerRequest * returns 0 on success, -1 on failure, so we have to reverse the * sense of the return value. */ if (requeue) { if (conn->co_Desc->d_Type == THREAD_POST) requeue= requeueServerRequest(conn, sreq, THREAD_POST, THREAD_PSIZE); else requeue= requeueServerRequest(conn, sreq, THREAD_SPOOL,THREAD_QSIZE); requeue = !requeue; } /* * If no requeue, closeout the request. */ if (requeue == 0) { /* * can be NULL if client was terminated */ if (sreq->sr_CConn) { if (ctl) { if (ctl[1] == '4') { /* * x4x codes are POST results of some sort - * this result only appears here via NNPostResponseX. * we probably want to log this response to the * client as it may be an informative error message * unlike the average spool server transaction * which we probably don't care about. */ MBLogPrintf(sreq->sr_CConn, &sreq->sr_CConn->co_TMBuf, "%s", ctl); } else { MBWrite(&sreq->sr_CConn->co_TMBuf, ctl, strlen(ctl)); } } /* * set FCounter to 1 to prevent further recursion, which might * feed back and screw up the state machine for this * connection. */ sreq->sr_CConn->co_FCounter = 1; sreq->sr_CConn->co_SReq = NULL; NNCommand(sreq->sr_CConn); } FreeSReq(sreq); } } /* * XXX what if NNCommand does something which hits a server which * kills the server ? boom, conn will bad illegal. XXX */ NNServerIdle(conn); } /* * we have to send a garbage command to prevent INN's nnrpd from timing * out in 60 seconds upon initial connect. */ void NNServerPrime1(Connection *conn) { if (conn->co_Flags & COF_MODEREADER) { MBPrintf(&conn->co_TMBuf, "mode reader\r\n"); NNServerPrime2(conn); } else if (conn->co_Flags & COF_READONLY) { MBPrintf(&conn->co_TMBuf, "mode readonly\r\n"); NNServerPrime4(conn); } else { MBPrintf(&conn->co_TMBuf, "mode thisbetterfail\r\n"); NNServerPrime3(conn); } } void NNServerPrime2(Connection *conn) { int len; char *ptr; conn->co_Func = NNServerPrime2; conn->co_State = "prime2"; if ((len = MBReadLine(&conn->co_RMBuf, &ptr)) != 0) { int code = strtol(ptr, NULL, 10); if (code >= 200 && code <= 299) { logit(LOG_INFO, "mode-reader(%s) %s", conn->co_Desc->d_Id, ptr); NNServerStart(conn); } else { logit(LOG_ERR, "mode-reader(%s) failed: %s", conn->co_Desc->d_Id, ptr); NNServerTerminate(conn); } } } void NNServerPrime3(Connection *conn) { int len; char *buf; conn->co_Func = NNServerPrime3; conn->co_State = "prime2"; /* * any return code or EOF. 0 means we haven't gotten the * return code yet. */ if ((len = MBReadLine(&conn->co_RMBuf, &buf)) != 0) { NNServerStart(conn); } } void NNServerPrime4(Connection *conn) { int len; char *ptr; conn->co_Func = NNServerPrime4; conn->co_State = "prime4"; if ((len = MBReadLine(&conn->co_RMBuf, &ptr)) != 0) { int code = strtol(ptr, NULL, 10); if (code >= 200 && code <= 299) logit(LOG_INFO, "mode-readonly(%s) %s", conn->co_Desc->d_Id, ptr); else logit(LOG_ERR, "mode-readonly(%s) failed: %s", conn->co_Desc->d_Id, ptr); NNServerStart(conn); } } /* * NNSERVERSTART() - start normal server operations. Clean up from connect * code, make server available to clients. */ void NNServerStart(Connection *conn) { conn->co_Desc->d_Count = 0; conn->co_Flags &= ~(COF_INPROGRESS|COF_ININIT); if (conn->co_Desc->d_Type == THREAD_SPOOL) --NReadServAct; else --NWriteServAct; NNServerIdle(conn); } /* * NNSERVERIDLE() - server idle, wait for EOF or start next queued request. * If no more queued requests, try to requeue from unqueued * requests. */ void NNServerIdle(Connection *conn) { conn->co_Func = NNServerIdle; conn->co_State = "sidle"; /* * Ooops, not really idle. This can happen due to the recursive * nature of much of the code. */ if ((conn->co_Flags & (COF_INPROGRESS|COF_CLOSESERVER)) == COF_INPROGRESS) return; /* * Check for an unexpected condition on server, i.e. data or * EOF on the input where we didn't expect any. */ if ((conn->co_Flags & COF_CLOSESERVER) == 0) { int len; char *buf; if ((len = MBReadLine(&conn->co_RMBuf, &buf)) != 0) { if (len > 1) { buf[len - 2] = 0; logit(LOG_ERR, "Server closed connection: %s: %s", conn->co_Desc->d_Id, buf); } else { logit(LOG_ERR, "Server closed connection: %s", conn->co_Desc->d_Id); } conn->co_Flags |= COF_CLOSESERVER; } } if (conn->co_Flags & COF_CLOSESERVER) { conn->co_Desc->d_Count = THREAD_LIMSIZE; NNServerTerminate(conn); } else if (conn->co_SReq) { conn->co_Flags |= COF_INPROGRESS; if (conn->co_Desc->d_Type == THREAD_POST) NNPostCommand1(conn); else NNSpoolCommand1(conn); } else { QueueServerRequests(); } } /* * NNServerTerminate() - terminate a server connection. Usually occurs * if the server goes down or the related process * on the server is killed. Any client request * queued to the server is requeued to another * server. */ void NNServerTerminate(Connection *conn) { ServReq *sreq; while ((sreq = conn->co_SReq) != NULL) { conn->co_SReq = sreq->sr_Next; --conn->co_Desc->d_Count; sreq->sr_Next = NULL; sreq->sr_SConn = NULL; sreq->sr_Time = time(NULL); if (conn->co_Desc->d_Type == THREAD_POST) { *PSWrite = sreq; PSWrite = &sreq->sr_Next; } else { *PSRead = sreq; PSRead = &sreq->sr_Next; } if (conn->co_Desc->d_Type == THREAD_POST) --NWriteServAct; else --NReadServAct; } /* * If we bumped the active count for the duration of the startup and * terminated prior to the startup completing, we have to fix it here. */ if (conn->co_Flags & COF_ININIT) { conn->co_Flags &= ~COF_ININIT; if (conn->co_Desc->d_Type == THREAD_POST) --NWriteServAct; else --NReadServAct; } /* * closeup the server. If d_Count is non-zero we have * to cleanup our reference counts, but then we set d_Count * to ensure nothing else gets queued to the server * between calling NNTerminate() and the actual termination. */ conn->co_Flags |= COF_CLOSESERVER; if (conn->co_Desc->d_Type == THREAD_POST) --NWriteServers; else --NReadServers; ++ServersTerminated; /* flag for rescan/reopen */ conn->co_Desc->d_Count = THREAD_LIMSIZE; conn->co_Flags |= COF_INPROGRESS; QueueServerRequests(); /* try to requeue requests */ NNTerminate(conn); } /* * NNSERVERCONNECT() - initial connection, get startup message (co_UCounter * starts out 1, we do not clear it and make the server * available until we get the startup message) */ void NNServerConnect(Connection *conn) { char *buf; char *ptr; int len; int code; conn->co_Func = NNServerConnect; conn->co_State = "sconn"; if ((len = MBReadLine(&conn->co_RMBuf, &buf)) <= 0) { if (len < 0) { logit(LOG_ERR, "connect(%s) failed", conn->co_Desc->d_Id); NNServerTerminate(conn); } if ( (conn->co_SessionStartTime + 60 ) < CurTime.tv_sec) { logit(LOG_ERR, "connect(%s) timed out", conn->co_Desc->d_Id); NNServerTerminate(conn); } return; } ptr = buf; code = strtol(ptr, NULL, 10); if (code == 200) { logit(LOG_INFO, "connect(%s) %s", conn->co_Desc->d_Id, buf); } else if (code == 201) { if (conn->co_Desc->d_Type == THREAD_POST) logit(LOG_INFO, "connect(%s) %s", conn->co_Desc->d_Id, buf); else logit(LOG_INFO, "connect(%s) %s", conn->co_Desc->d_Id, buf); } else { logit(LOG_ERR, "connect(%s) unrecognized banner: %s", conn->co_Desc->d_Id, buf); NNServerTerminate(conn); return; } NNServerPrime1(conn); } /* * NNARTICLERETRIEVEBYMESSAGEID() - retrieve article by message-id * * Retrieve an article by its message id and write the article * to the specified connection. * * (a) attempt to fetch the article from cache (positive or negative hit) * * (b) initiate the state machine to attempt to fetch the article from a * remote server. * * (c) on remote server fetch completion, cache the article locally * * (d) place article in transmission buffer for connection, transmitting * it, then return to the command state. * * COM_ARTICLEWVF retrieve article from remote by message-id but retrieve * headers by co_ArtNo. * * COM_ARTICLE retrieve entire article from remote by message-id * * COM_... */ void NNArticleRetrieveByMessageId(Connection *conn, const char *msgid, int grouphint) { FILE *cache = NULL; /* * (a) retrieve from cache if caching is enabled */ if (DOpts.ReaderCacheMode) { int valid; int size; int cfd; valid = OpenCache(msgid, &cfd, &size); if (valid > 0) { /* * good cache */ const char *map; if (DebugOpt) printf("good cache\n"); if ((map = xmap(NULL, size, PROT_READ, MAP_SHARED, cfd, 0)) != NULL) { xadvise(map, ((size < 65536) ? size : 65536), XADV_WILLNEED); if (conn->co_ArtMode != COM_BODYNOSTAT) { MBLogPrintf(conn, &conn->co_TMBuf, "%03d %d %s %s\r\n", GoodRC(conn), ((conn->co_ArtMode==COM_ARTICLEWVF)?conn->co_ArtNo:0), msgid, GoodResId(conn) ); } if (conn->co_ArtMode != COM_STAT) { DumpArticleFromCache(conn, map, size); MBPrintf(&conn->co_TMBuf, ".\r\n"); } xunmap((void *)map, size); } else { if (conn->co_ArtMode == COM_BODYNOSTAT) MBLogPrintf(conn, &conn->co_TMBuf, "(article not available)\r\n.\r\n"); else if (conn->co_RequestFlags == ARTFETCH_ARTNO) MBLogPrintf(conn, &conn->co_TMBuf, "423 No such article number in this group\r\n"); else MBLogPrintf(conn, &conn->co_TMBuf, "430 No such article\r\n"); } close(cfd); NNCommand(conn); return; } else if (valid == 0) { /* * not in cache, cache file created if cfd >= 0 */ if (DebugOpt) printf("bad cache\n"); if (cfd >= 0) { cache = fdopen(cfd, "w"); } /* fall through */ } else if (valid < 0) { /* * negatively cached */ if (DebugOpt) printf("neg cache\n"); if (conn->co_ArtMode == COM_BODYNOSTAT) MBLogPrintf(conn, &conn->co_TMBuf, "(article not available)\r\n.\r\n"); else if (conn->co_RequestFlags == ARTFETCH_ARTNO) MBLogPrintf(conn, &conn->co_TMBuf, "423 No such article number in this group\r\n"); else MBLogPrintf(conn, &conn->co_TMBuf, "430 No such article\r\n"); NNCommand(conn); return; } } /* * (b)(c)(d) */ NNServerRequest(conn, grouphint ? conn->co_GroupName : NULL, msgid, cache, SREQ_RETRIEVE); NNWaitThread(conn); }