1 // ------------------------------------------------
6 // Servents are the actual connections between clients. They do the handshaking,
7 // transfering of data and processing of GnuPackets. Each servent has one socket allocated
8 // to it on connect, it uses this to transfer all of its data.
10 // (c) 2002 peercast.org
11 // ------------------------------------------------
12 // This program is free software; you can redistribute it and/or modify
13 // it under the terms of the GNU General Public License as published by
14 // the Free Software Foundation; either version 2 of the License, or
15 // (at your option) any later version.
17 // This program is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 // GNU General Public License for more details.
21 // ------------------------------------------------
22 // todo: make lan->yp not check firewall
38 #include "chkMemoryLeak.h"
39 #define DEBUG_NEW new(__FILE__, __LINE__)
43 #include "win32/seh.h"
46 const int DIRECT_WRITE_TIMEOUT = 60;
48 // -----------------------------------
49 char *Servent::statusMsgs[]=
66 // -----------------------------------
67 char *Servent::typeMsgs[]=
78 // -----------------------------------
79 bool Servent::isPrivate()
82 return servMgr->isFiltered(ServFilter::F_PRIVATE,h) || h.isLocalhost();
84 // -----------------------------------
85 bool Servent::isAllowed(int a)
89 if (servMgr->isFiltered(ServFilter::F_BAN,h))
95 // -----------------------------------
96 bool Servent::isFiltered(int f)
99 return servMgr->isFiltered(f,h);
102 int servent_count = 1;
103 // -----------------------------------
104 Servent::Servent(int index)
105 :outPacketsPri(MAX_OUTPACKETS)
106 ,outPacketsNorm(MAX_OUTPACKETS)
113 servent_id = servent_count++;
119 // -----------------------------------
124 // -----------------------------------
127 thread.active = false;
129 setStatus(S_CLOSING);
133 PCPStream *pcp = pcpStream;
139 chanMgr->hitlistlock.on();
140 ChanHitList *chl = chanMgr->findHitListByID(chanID);
142 ChanHit *chh = chl->hit;
143 ChanHit *prev = NULL;
145 if (chh->servent_id == this->servent_id){
146 if ((servMgr->kickKeepTime != 0) && (chh->firewalled == 1)){
148 chh->numListeners = 0;
153 ChanHit *next = chh->next;
168 chanMgr->hitlistlock.off();
186 if (type != T_SERVER)
193 // -----------------------------------
194 void Servent::abort()
196 thread.active = false;
204 // -----------------------------------
205 void Servent::reset()
219 outputProtocol = ChanInfo::SP_UNKNOWN;
228 lastConnect = lastPing = lastPacket = 0;
229 loginPassword.clear();
232 priorityConnect = false;
236 outPacketsNorm.reset();
237 outPacketsPri.reset();
248 // -----------------------------------
249 bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
254 && (!cid.isSet() || chanID.isSame(cid))
255 && (!sid.isSet() || !sid.isSame(remoteID))
256 && (pcpStream != NULL)
259 return pcpStream->sendPacket(pack,did);
265 // -----------------------------------
266 bool Servent::acceptGIV(ClientSocket *givSock)
276 // -----------------------------------
277 Host Servent::getHost()
287 // -----------------------------------
288 bool Servent::outputPacket(GnuPacket &p, bool pri)
294 r = outPacketsPri.write(p);
297 if (servMgr->useFlowControl)
299 int per = outPacketsNorm.percentFull();
310 // if in flowcontrol, only allow packets with less of a hop count than already in queue
311 if (p.hops >= outPacketsNorm.findMinHop())
316 r = outPacketsNorm.write(p);
323 // -----------------------------------
324 bool Servent::initServer(Host &h)
338 thread.func = serverProc;
342 if (!sys->startThread(&thread))
343 throw StreamException("Can`t start thread");
345 }catch(StreamException &e)
347 LOG_ERROR("Bad server: %s",e.msg);
354 // -----------------------------------
355 void Servent::checkFree()
358 throw StreamException("Socket already set");
360 throw StreamException("Thread already active");
362 // -----------------------------------
363 void Servent::initIncoming(ClientSocket *s, unsigned int a)
374 thread.func = incomingProc;
375 thread.finish = false;
377 setStatus(S_PROTOCOL);
380 sock->host.toStr(ipStr);
381 LOG_DEBUG("Incoming from %s",ipStr);
383 if (!sys->startThread(&thread))
384 throw StreamException("Can`t start thread");
385 }catch(StreamException &e)
387 //LOG_ERROR("!!FATAL!! Incoming error: %s",e.msg);
388 //servMgr->shutdownTimer = 1;
391 LOG_ERROR("INCOMING FAILED: %s",e.msg);
396 // -----------------------------------
397 void Servent::initOutgoing(TYPE ty)
407 thread.func = outgoingProc;
409 if (!sys->startThread(&thread))
410 throw StreamException("Can`t start thread");
412 }catch(StreamException &e)
414 LOG_ERROR("Unable to start outgoing: %s",e.msg);
419 // -----------------------------------
420 void Servent::initPCP(Host &rh)
434 if (!isAllowed(ALLOW_NETWORK))
435 throw StreamException("Servent not allowed");
438 thread.func = outgoingProc;
440 LOG_DEBUG("Outgoing to %s",ipStr);
442 if (!sys->startThread(&thread))
443 throw StreamException("Can`t start thread");
445 }catch(StreamException &e)
447 LOG_ERROR("Unable to open connection to %s - %s",ipStr,e.msg);
453 // -----------------------------------
454 void Servent::initChannelFetch(Host &host)
468 if (!isAllowed(ALLOW_DATA))
469 throw StreamException("Servent not allowed");
475 // -----------------------------------
476 void Servent::initGIV(Host &h, GnuID &id)
490 if (!isAllowed(ALLOW_NETWORK))
491 throw StreamException("Servent not allowed");
496 thread.func = givProc;
500 if (!sys->startThread(&thread))
501 throw StreamException("Can`t start thread");
503 }catch(StreamException &e)
505 LOG_ERROR("GIV error to %s: %s",ipStr,e.msg);
509 // -----------------------------------
510 void Servent::createSocket()
513 LOG_ERROR("Servent::createSocket attempt made while active");
515 sock = sys->createSocket();
517 // -----------------------------------
518 void Servent::setStatus(STATUS s)
524 if ((s == S_HANDSHAKE) || (s == S_CONNECTED) || (s == S_LISTENING))
525 lastConnect = sys->getTime();
530 // -----------------------------------
531 void Servent::handshakeOut()
533 sock->writeLine(GNU_PEERCONN);
537 sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_AGENT);
538 sock->writeLineF("%s %d",PCX_HS_PCP,1);
541 sock->writeLineF("%s %d",PCX_HS_PRIORITY,1);
543 if (networkID.isSet())
545 networkID.toStr(str);
546 sock->writeLineF("%s %s",PCX_HS_NETWORKID,str);
549 servMgr->sessionID.toStr(str);
550 sock->writeLineF("%s %s",PCX_HS_ID,str);
553 sock->writeLineF("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS());
559 int r = http.readResponse();
563 LOG_ERROR("Expected 200, got %d",r);
564 throw StreamException("Unexpected HTTP response");
568 bool versionValid = false;
573 while (http.nextHeader())
575 LOG_DEBUG(http.cmdLine);
577 char *arg = http.getArgStr();
581 if (http.isHeader(HTTP_HS_AGENT))
585 if (strnicmp(arg,"PeerCast/",9)==0)
586 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
587 }else if (http.isHeader(PCX_HS_NETWORKID))
588 clientID.fromStr(arg);
591 if (!clientID.isSame(networkID))
592 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
595 throw HTTPException(HTTP_SC_UNAUTHORIZED,401);
598 sock->writeLine(GNU_OK);
604 // -----------------------------------
605 void Servent::processOutChannel()
610 // -----------------------------------
611 void Servent::handshakeIn()
619 bool versionValid = false;
620 bool diffRootVer = false;
625 while (http.nextHeader())
627 LOG_DEBUG("%s",http.cmdLine);
629 char *arg = http.getArgStr();
633 if (http.isHeader(HTTP_HS_AGENT))
637 if (strnicmp(arg,"PeerCast/",9)==0)
639 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
640 diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0;
642 }else if (http.isHeader(PCX_HS_NETWORKID))
644 clientID.fromStr(arg);
646 }else if (http.isHeader(PCX_HS_PRIORITY))
648 priorityConnect = atoi(arg)!=0;
650 }else if (http.isHeader(PCX_HS_ID))
654 if (id.isSame(servMgr->sessionID))
655 throw StreamException("Servent loopback");
657 }else if (http.isHeader(PCX_HS_OS))
659 if (stricmp(arg,PCX_OS_LINUX)==0)
661 else if (stricmp(arg,PCX_OS_WIN32)==0)
663 else if (stricmp(arg,PCX_OS_MACOSX)==0)
665 else if (stricmp(arg,PCX_OS_WINAMP2)==0)
671 if (!clientID.isSame(networkID))
672 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
674 // if this is a priority connection and all incoming connections
675 // are full then kill an old connection to make room. Otherwise reject connection.
676 //if (!priorityConnect)
679 if (servMgr->pubInFull())
680 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
684 throw HTTPException(HTTP_SC_FORBIDDEN,403);
686 sock->writeLine(GNU_OK);
688 sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_OLDAGENT);
690 if (networkID.isSet())
693 networkID.toStr(idStr);
694 sock->writeLineF("%s %s",PCX_HS_NETWORKID,idStr);
699 sock->writeLineF("%s %d",PCX_HS_FLOWCTL,servMgr->useFlowControl?1:0);
700 sock->writeLineF("%s %d",PCX_HS_MINBCTTL,chanMgr->minBroadcastTTL);
701 sock->writeLineF("%s %d",PCX_HS_MAXBCTTL,chanMgr->maxBroadcastTTL);
702 sock->writeLineF("%s %d",PCX_HS_RELAYBC,servMgr->relayBroadcast);
703 //sock->writeLine("%s %d",PCX_HS_FULLHIT,2);
708 sock->writeString(PCX_HS_DL);
709 sock->writeLine(PCX_DL_URL);
712 sock->writeLineF("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr());
719 sock->writeLineF("%s %s",PCX_HS_REMOTEIP,hostIP);
725 while (http.nextHeader());
728 // -----------------------------------
729 bool Servent::pingHost(Host &rhost,GnuID &rsid)
733 LOG_DEBUG("Ping host %s: trying..",ipstr);
734 ClientSocket *s=NULL;
738 s = sys->createSocket();
744 s->setReadTimeout(15000);
745 s->setWriteTimeout(15000);
751 atom.writeInt(PCP_CONNECT,1);
752 atom.writeParent(PCP_HELO,1);
753 atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
759 ID4 id = atom.read(numc,numd);
762 for(int i=0; i<numc; i++)
765 ID4 pid = atom.read(c,d);
766 if (pid == PCP_SESSIONID)
767 atom.readBytes(sid.id,16,d);
773 LOG_DEBUG("Ping response: %s",id.getString().str());
774 throw StreamException("Bad ping response");
777 if (!sid.isSame(rsid))
778 throw StreamException("SIDs don`t match");
781 LOG_DEBUG("Ping host %s: OK",ipstr);
782 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT);
786 }catch(StreamException &e)
788 LOG_DEBUG("Ping host %s: %s",ipstr,e.msg);
804 // -----------------------------------
805 bool Servent::handshakeStream(ChanInfo &chanInfo)
812 unsigned int reqPos=0;
813 unsigned short listenPort = 0;
817 while (http.nextHeader())
819 char *arg = http.getArgStr();
823 if (http.isHeader(PCX_HS_PCP))
824 gotPCP = atoi(arg)!=0;
825 else if (http.isHeader(PCX_HS_POS))
827 else if (http.isHeader(PCX_HS_PORT))
828 listenPort = (unsigned short)atoi(arg);
829 else if (http.isHeader("icy-metadata"))
830 addMetadata = atoi(arg) > 0;
831 else if (http.isHeader(HTTP_HS_AGENT))
833 else if (http.isHeader("Pragma"))
835 char *ssc = stristr(arg,"stream-switch-count=");
836 char *so = stristr(arg,"stream-offset");
841 //nsSwitchNum = atoi(ssc+20);
845 LOG_DEBUG("Stream: %s",http.cmdLine);
849 if ((!gotPCP) && (outputProtocol == ChanInfo::SP_PCP))
850 outputProtocol = ChanInfo::SP_PEERCAST;
852 if (outputProtocol == ChanInfo::SP_HTTP)
854 if ( (chanInfo.srcProtocol == ChanInfo::SP_MMS)
855 || (chanInfo.contentType == ChanInfo::T_WMA)
856 || (chanInfo.contentType == ChanInfo::T_WMV)
857 || (chanInfo.contentType == ChanInfo::T_ASX)
859 outputProtocol = ChanInfo::SP_MMS;
863 bool chanFound=false;
864 bool chanReady=false;
866 ChanHit *sourceHit = NULL;
868 Channel *ch = chanMgr->findChannelByID(chanInfo.id);
872 if (reqPos || !isIndexTxt(&chanInfo))
874 streamPos = ch->rawData.findOldestPos(reqPos);
875 //streamPos = ch->rawData.getLatestPos();
878 streamPos = ch->rawData.getLatestPos();
881 chanID = chanInfo.id;
882 serventHit.host.ip = getHost().ip;
883 serventHit.host.port = listenPort;
884 if (serventHit.host.globalIP())
885 serventHit.rhost[0] = serventHit.host;
887 serventHit.rhost[1] = serventHit.host;
888 serventHit.chanID = chanID;
891 chanReady = canStream(ch);
892 if (0 && !chanReady && ch->isPlaying())
894 if (ch->info.getUptime() > 60
895 && servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
897 sourceHit = &ch->sourceHost; // send source host info
901 // connect "this" host later
902 chanMgr->addHit(serventHit);
906 getHost().toStr(tmp);
907 LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
910 else if (servMgr->kickUnrelayableHost(chanID, serventHit) != 0)
912 chanReady = canStream(ch);
914 LOG_DEBUG("*** Kicked unrelayable host, but still cannot stream");
917 if (!chanReady) type = T_INCOMING;
918 thread.active = chanReady;
919 setStatus(S_CONNECTED);
921 channel_id = ch->channel_id;
924 if (servMgr->isCheckPushStream())
926 if (chanReady == true)
930 if (!h.isLocalhost())
934 if (strstr(agent.cstr(),"PeerCast/0.119") != NULL)
938 LOG_ERROR("Block v0.119 Servent : %s (%s)",strip,agent.cstr());
943 /* ChanHitList *hits[ChanMgr::MAX_HITLISTS];
945 for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
947 ChanHitList *chl = &chanMgr->hitlists[i];
949 hits[numHits++] = chl;
953 for(int i=0; i<numHits; i++)
955 ChanHitList *chl = hits[i];
958 for (int j=0; j<ChanHitList::MAX_HITS; j++)
960 ChanHit *hit = &chl->hits[j];
961 if (hit->host.isValid() && (h.ip == hit->host.ip))
965 numRelay = hit->numRelays;
970 if ((isfw == true) && (numRelay == 0))
974 LOG_ERROR("Block firewalled Servent : %s",strip);
978 ChanHitList *chl = chanMgr->findHitList(chanInfo);
979 ChanHit *hit = (chl ? chl->hit : NULL);
981 if (hit->host.isValid() && (h.ip == hit->host.ip))
983 if ((hit->firewalled) && (hit->numRelays == 0)){
986 LOG_ERROR("Block firewalled Servent : %s",strip);
999 // LockBlock lockblock(chanMgr->hitlistlock);
1001 // lockblock.lockon();
1002 ChanHitList *chl = chanMgr->findHitList(chanInfo);
1010 bool result = false;
1013 chanInfo.id.toStr(idStr);
1016 servMgr->sessionID.toStr(sidStr);
1018 Host rhost = sock->host;
1023 AtomStream atom(*sock);
1029 sock->writeLine(HTTP_SC_NOTFOUND);
1030 sock->writeLine("");
1031 LOG_DEBUG("Sending channel not found");
1038 if (outputProtocol == ChanInfo::SP_PCP)
1042 MemoryStream mem(tbuf, sizeof(tbuf));
1043 mem.writeLine(HTTP_SC_UNAVAILABLE);
1044 mem.writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1046 sock->write(tbuf, mem.getPosition());
1048 handshakeIncomingPCP(atom,rhost,remoteID,agent);
1051 rhost.toStr(ripStr);
1053 LOG_DEBUG("Sending channel unavailable");
1058 AtomStream atom2(mem);
1060 int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
1063 sourceHit->writeAtoms(atom2,chanInfo.id);
1065 sourceHit->host.toStr(tmp);
1066 LOG_DEBUG("relay info(sourceHit): %s", tmp);
1069 chanMgr->hitlistlock.on();
1071 chl = chanMgr->findHitList(chanInfo);
1073 if (chl && !sourceHit)
1077 // search for up to 8 other hits
1085 // find best hit this network if local IP
1086 if (!rhost.globalIP())
1089 chs.matchHost = servMgr->serverHost;
1091 chs.excludeID = remoteID;
1092 if (chl->pickHits(chs)){
1094 LOG_DEBUG("find best hit this network if local IP");
1098 // find best hit on same network
1102 chs.matchHost = rhost;
1104 chs.excludeID = remoteID;
1105 if (chl->pickHits(chs)){
1107 LOG_DEBUG("find best hit on same network");
1112 // find best hit on other networks
1113 /* if (!best.host.ip)
1117 chs.excludeID = remoteID;
1118 if (chl->pickHits(chs)){
1120 LOG_DEBUG("find best hit on other networks");
1128 best.writeAtoms(atom2,chanInfo.id);
1134 // chanMgr->hitlistlock.on();
1135 int rhcnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
1136 // chanMgr->hitlistlock.off();
1137 for (int i = 0; i < rhcnt; i++){
1138 chs.best[i].writeAtoms(atom2, chanInfo.id);
1139 chs.best[i].host.toStr(tmp);
1140 LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
1141 best.host.ip = chs.best[i].host.ip;
1148 LOG_DEBUG("Sent %d channel hit(s) to %s",cnt,ripStr);
1151 else if (rhost.port)
1153 // find firewalled host
1156 chs.useFirewalled = true;
1157 chs.excludeID = remoteID;
1158 if (chl->pickHits(chs))
1161 int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_RELAY);
1162 LOG_DEBUG("Broadcasted channel push request to %d clients for %s",cnt,ripStr);
1166 // if all else fails, use tracker
1169 // find best tracker on this network if local IP
1170 if (!rhost.globalIP())
1173 chs.matchHost = servMgr->serverHost;
1174 chs.trackersOnly = true;
1175 chs.excludeID = remoteID;
1176 if (chl->pickHits(chs))
1181 // find local tracker
1185 chs.matchHost = rhost;
1186 chs.trackersOnly = true;
1187 chs.excludeID = remoteID;
1188 if (chl->pickHits(chs))
1192 // find global tracker
1196 chs.trackersOnly = true;
1197 chs.excludeID = remoteID;
1198 if (chl->pickHits(chs))
1204 best.writeAtoms(atom2,chanInfo.id);
1205 LOG_DEBUG("Sent 1 tracker hit to %s",ripStr);
1206 }else if (rhost.port)
1208 // find firewalled tracker
1210 chs.useFirewalled = true;
1211 chs.trackersOnly = true;
1212 chs.excludeID = remoteID;
1214 if (chl->pickHits(chs))
1217 int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_CIN);
1218 LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,ripStr);
1227 chanMgr->hitlistlock.off();
1229 // return not available yet code
1230 atom2.writeInt(PCP_QUIT,error);
1231 sock->write(tbuf, mem.getPosition());
1236 // wait disconnect from other host
1238 while(sock->read(c, sizeof(c))){
1241 }catch(StreamException &e){
1242 LOG_DEBUG("RelayInfoWait: %s",e.msg);
1247 LOG_DEBUG("Sending channel unavailable");
1248 sock->writeLine(HTTP_SC_UNAVAILABLE);
1249 sock->writeLine("");
1255 if (chanInfo.contentType != ChanInfo::T_MP3)
1258 if (addMetadata && (outputProtocol == ChanInfo::SP_HTTP)) // winamp mp3 metadata check
1261 sock->writeLine(ICY_OK);
1263 sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1264 sock->writeLineF("icy-name:%s",chanInfo.name.cstr());
1265 sock->writeLineF("icy-br:%d",chanInfo.bitrate);
1266 sock->writeLineF("icy-genre:%s",chanInfo.genre.cstr());
1267 sock->writeLineF("icy-url:%s",chanInfo.url.cstr());
1268 sock->writeLineF("icy-metaint:%d",chanMgr->icyMetaInterval);
1269 sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1271 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1276 sock->writeLine(HTTP_SC_OK);
1278 if ((chanInfo.contentType != ChanInfo::T_ASX) && (chanInfo.contentType != ChanInfo::T_WMV) && (chanInfo.contentType != ChanInfo::T_WMA))
1280 sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1282 sock->writeLine("Accept-Ranges: none");
1284 sock->writeLineF("x-audiocast-name: %s",chanInfo.name.cstr());
1285 sock->writeLineF("x-audiocast-bitrate: %d",chanInfo.bitrate);
1286 sock->writeLineF("x-audiocast-genre: %s",chanInfo.genre.cstr());
1287 sock->writeLineF("x-audiocast-description: %s",chanInfo.desc.cstr());
1288 sock->writeLineF("x-audiocast-url: %s",chanInfo.url.cstr());
1289 sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1293 if (outputProtocol == ChanInfo::SP_HTTP)
1295 switch (chanInfo.contentType)
1297 case ChanInfo::T_OGG:
1298 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XOGG);
1300 case ChanInfo::T_MP3:
1301 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1303 case ChanInfo::T_MOV:
1304 sock->writeLine("Connection: close");
1305 sock->writeLine("Content-Length: 10000000");
1306 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MOV);
1308 case ChanInfo::T_MPG:
1309 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MPG);
1311 case ChanInfo::T_NSV:
1312 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_NSV);
1314 case ChanInfo::T_ASX:
1315 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_ASX);
1317 case ChanInfo::T_WMA:
1318 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMA);
1320 case ChanInfo::T_WMV:
1321 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMV);
1324 } else if (outputProtocol == ChanInfo::SP_MMS)
1326 sock->writeLine("Server: Rex/9.0.0.2980");
1327 sock->writeLine("Cache-Control: no-cache");
1328 sock->writeLine("Pragma: no-cache");
1329 sock->writeLine("Pragma: client-id=3587303426");
1330 sock->writeLine("Pragma: features=\"broadcast,playlist\"");
1334 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MMS);
1337 if (agent.contains("Android"))
1339 LOG_DEBUG("INFO: Android client detected.");
1340 sock->writeLineF("%s %s", HTTP_HS_CONTENT, MIME_WMV);
1343 sock->writeLine("Content-Type: application/vnd.ms.wms-hdr.asfv1");
1345 sock->writeLineF("Content-Length: %d",ch->headPack.len);
1346 sock->writeLine("Connection: Keep-Alive");
1350 } else if (outputProtocol == ChanInfo::SP_PCP)
1352 sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
1353 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1355 }else if (outputProtocol == ChanInfo::SP_PEERCAST)
1357 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST);
1360 sock->writeLine("");
1365 handshakeIncomingPCP(atom,rhost,remoteID,agent);
1366 atom.writeInt(PCP_OK,0);
1367 if (rhost.globalIP())
1368 serventHit.rhost[0] = rhost;
1370 serventHit.rhost[1] = rhost;
1371 serventHit.sessionID = remoteID;
1372 serventHit.numHops = 1;
1373 chanMgr->addHit(serventHit);
1383 // -----------------------------------
1384 void Servent::handshakeGiv(GnuID &id)
1390 sock->writeLineF("GIV /%s",idstr);
1392 sock->writeLine("GIV");
1394 sock->writeLine("");
1398 // -----------------------------------
1399 void Servent::processGnutella()
1403 //if (servMgr->isRoot && !servMgr->needConnections())
1404 if (servMgr->isRoot)
1412 gnuStream.init(sock);
1413 setStatus(S_CONNECTED);
1415 if (!servMgr->isRoot)
1417 chanMgr->broadcastRelays(this, 1, 1);
1420 if ((p=outPacketsNorm.curr()))
1421 gnuStream.sendPacket(*p);
1427 // if (type != T_LOOKUP)
1428 // chanMgr->broadcastRelays(this,chanMgr->minBroadcastTTL,2);
1430 lastPacket = lastPing = sys->getTime();
1431 bool doneBigPing=false;
1433 const unsigned int abortTimeoutSecs = 60; // abort connection after 60 secs of no activitiy
1434 const unsigned int packetTimeoutSecs = 30; // ping connection after 30 secs of no activity
1436 unsigned int currBytes=0;
1437 unsigned int lastWait=0;
1439 unsigned int lastTotalIn=0,lastTotalOut=0;
1441 while (thread.active && sock->active())
1444 if (sock->readReady())
1446 lastPacket = sys->getTime();
1448 if (gnuStream.readPacket(pack))
1451 sock->host.toStr(ipstr);
1454 GnuStream::R_TYPE ret = GnuStream::R_PROCESS;
1456 if (pack.func != GNU_FUNC_PONG)
1457 if (servMgr->seenPacket(pack))
1458 ret = GnuStream::R_DUPLICATE;
1460 seenIDs.add(pack.id);
1463 if (ret == GnuStream::R_PROCESS)
1466 ret = gnuStream.processPacket(pack,this,routeID);
1468 if (flowControl && (ret == GnuStream::R_BROADCAST))
1469 ret = GnuStream::R_DROP;
1475 case GnuStream::R_BROADCAST:
1476 if (servMgr->broadcast(pack,this))
1477 stats.add(Stats::NUMBROADCASTED);
1479 stats.add(Stats::NUMDROPPED);
1481 case GnuStream::R_ROUTE:
1482 if (servMgr->route(pack,routeID,NULL))
1483 stats.add(Stats::NUMROUTED);
1485 stats.add(Stats::NUMDROPPED);
1487 case GnuStream::R_ACCEPTED:
1488 stats.add(Stats::NUMACCEPTED);
1490 case GnuStream::R_DUPLICATE:
1491 stats.add(Stats::NUMDUP);
1493 case GnuStream::R_DEAD:
1494 stats.add(Stats::NUMDEAD);
1496 case GnuStream::R_DISCARD:
1497 stats.add(Stats::NUMDISCARDED);
1499 case GnuStream::R_BADVERSION:
1500 stats.add(Stats::NUMOLD);
1502 case GnuStream::R_DROP:
1503 stats.add(Stats::NUMDROPPED);
1508 LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,ipstr);
1513 LOG_ERROR("Bad packet");
1520 if ((p=outPacketsPri.curr())) // priority packet
1522 gnuStream.sendPacket(*p);
1524 outPacketsPri.next();
1525 } else if ((p=outPacketsNorm.curr())) // or.. normal packet
1527 gnuStream.sendPacket(*p);
1529 outPacketsNorm.next();
1532 int lpt = sys->getTime()-lastPacket;
1536 if ((sys->getTime()-lastPing) > 15)
1539 lastPing = sys->getTime();
1543 if (lpt > packetTimeoutSecs)
1546 if ((sys->getTime()-lastPing) > packetTimeoutSecs)
1549 lastPing = sys->getTime();
1554 if (lpt > abortTimeoutSecs)
1555 throw TimeoutException();
1558 unsigned int totIn = sock->totalBytesIn-lastTotalIn;
1559 unsigned int totOut = sock->totalBytesOut-lastTotalOut;
1561 unsigned int bytes = totIn+totOut;
1563 lastTotalIn = sock->totalBytesIn;
1564 lastTotalOut = sock->totalBytesOut;
1566 const int serventBandwidth = 1000;
1568 int delay = sys->idleSleepTime;
1569 if ((bytes) && (serventBandwidth >= 8))
1570 delay = (bytes*1000)/(serventBandwidth/8); // set delay relative packetsize
1572 if (delay < (int)sys->idleSleepTime)
1573 delay = sys->idleSleepTime;
1574 //LOG("delay %d, in %d, out %d",delay,totIn,totOut);
1582 // -----------------------------------
1583 void Servent::processRoot()
1588 gnuStream.init(sock);
1589 setStatus(S_CONNECTED);
1593 unsigned int lastConnect = sys->getTime();
1595 while (thread.active && sock->active())
1597 if (gnuStream.readPacket(pack))
1600 sock->host.toStr(ipstr);
1602 LOG_NETWORK("packet in: %d from %s",pack.func,ipstr);
1605 if (pack.func == GNU_FUNC_PING) // if ping then pong back some hosts and close
1609 int cnt = servMgr->getNewestServents(hl,32,sock->host);
1612 int start = sys->rnd() % cnt;
1613 int max = cnt>8?8:cnt;
1615 for(int i=0; i<max; i++)
1619 pong.initPong(hl[start],false,pack);
1620 gnuStream.sendPacket(pong);
1623 hl[start].toStr(ipstr);
1625 //LOG_NETWORK("Pong %d: %s",start+1,ipstr);
1626 start = (start+1) % cnt;
1629 sock->host.toStr(str);
1630 LOG_NETWORK("Sent %d pong(s) to %s",max,str);
1633 LOG_NETWORK("No Pongs to send");
1636 }else if (pack.func == GNU_FUNC_PONG) // pong?
1638 MemoryStream pong(pack.data,pack.len);
1641 port = pong.readShort();
1642 ip = pong.readLong();
1647 if ((ip) && (port) && (h.globalIP()))
1650 LOG_NETWORK("added pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port);
1651 servMgr->addHost(h,ServHost::T_SERVENT,sys->getTime());
1654 } else if (pack.func == GNU_FUNC_HIT)
1656 MemoryStream data(pack.data,pack.len);
1658 gnuStream.readHit(data,hit,pack.hops,pack.id);
1661 //if (gnuStream.packetsIn > 5) // die if we get too many packets
1665 if((sys->getTime()-lastConnect > 60))
1670 }catch(StreamException &e)
1672 LOG_ERROR("Relay: %s",e.msg);
1678 // -----------------------------------
1679 int Servent::givProcMain(ThreadInfo *thread)
1682 Servent *sv = (Servent*)thread->data;
1685 sv->handshakeGiv(sv->givID);
1686 sv->handshakeIncoming();
1688 }catch(StreamException &e)
1690 LOG_ERROR("GIV: %s",e.msg);
1694 sys->endThread(thread);
1698 // -----------------------------------
1699 int Servent::givProc(ThreadInfo *thread)
1701 SEH_THREAD(givProcMain, Servent::givProc);
1704 // -----------------------------------
1705 void Servent::handshakeOutgoingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent, bool isTrusted)
1708 bool nonFW = (servMgr->getFirewall() != ServMgr::FW_ON);
1709 bool testFW = (servMgr->getFirewall() == ServMgr::FW_UNKNOWN);
1711 bool sendBCID = isTrusted && chanMgr->isBroadcasting();
1714 MemoryStream mem(tbuf, sizeof(tbuf));
1715 AtomStream atom2(mem);
1716 atom2.writeParent(PCP_HELO,3 + (testFW?1:0) + (nonFW?1:0) + (sendBCID?1:0));
1717 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1718 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1719 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1721 atom2.writeShort(PCP_HELO_PORT,servMgr->serverHost.port);
1723 atom2.writeShort(PCP_HELO_PING,servMgr->serverHost.port);
1725 atom2.writeBytes(PCP_HELO_BCID,chanMgr->broadcastID.id,16);
1726 atom.io.write(tbuf, mem.getPosition());
1729 LOG_DEBUG("PCP outgoing waiting for OLEH..");
1732 ID4 id = atom.read(numc,numd);
1735 LOG_DEBUG("PCP outgoing reply: %s",id.getString().str());
1736 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1737 throw StreamException("Got unexpected PCP response");
1752 // read OLEH response
1753 for(int i=0; i<numc; i++)
1756 ID4 id = atom.read(c,dlen);
1758 if (id == PCP_HELO_AGENT)
1760 atom.readString(arg,sizeof(arg),dlen);
1763 }else if (id == PCP_HELO_REMOTEIP)
1765 thisHost.ip = atom.readInt();
1767 }else if (id == PCP_HELO_PORT)
1769 thisHost.port = atom.readShort();
1771 }else if (id == PCP_HELO_VERSION)
1773 version = atom.readInt();
1775 }else if (id == PCP_HELO_DISABLE)
1777 disable = atom.readInt();
1779 }else if (id == PCP_HELO_SESSIONID)
1781 atom.readBytes(rid.id,16);
1782 if (rid.isSame(servMgr->sessionID))
1783 throw StreamException("Servent loopback");
1787 LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1794 // update server ip/firewall status
1797 if (thisHost.isValid())
1799 if ((servMgr->serverHost.ip != thisHost.ip) && (servMgr->forceIP.isEmpty()))
1802 thisHost.toStr(ipstr);
1803 LOG_DEBUG("Got new ip: %s",ipstr);
1804 servMgr->serverHost.ip = thisHost.ip;
1807 if (servMgr->getFirewall() == ServMgr::FW_UNKNOWN)
1809 if (thisHost.port && thisHost.globalIP())
1810 servMgr->setFirewall(ServMgr::FW_OFF);
1812 servMgr->setFirewall(ServMgr::FW_ON);
1818 LOG_ERROR("client disabled: %d",disable);
1819 servMgr->isDisabled = true;
1822 servMgr->isDisabled = false;
1830 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1831 throw StreamException("Remote host not identified");
1834 LOG_DEBUG("PCP Outgoing handshake complete.");
1838 // -----------------------------------
1839 void Servent::handshakeIncomingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
1842 ID4 id = atom.read(numc,numd);
1847 LOG_DEBUG("PCP incoming reply: %s",id.getString().str());
1848 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1849 throw StreamException("Got unexpected PCP response");
1868 for(int i=0; i<numc; i++)
1872 ID4 id = atom.read(c,dlen);
1874 if (id == PCP_HELO_AGENT)
1876 atom.readString(arg,sizeof(arg),dlen);
1879 }else if (id == PCP_HELO_VERSION)
1881 version = atom.readInt();
1883 }else if (id == PCP_HELO_SESSIONID)
1885 atom.readBytes(rid.id,16);
1886 if (rid.isSame(servMgr->sessionID))
1887 throw StreamException("Servent loopback");
1889 }else if (id == PCP_HELO_BCID)
1891 atom.readBytes(bcID.id,16);
1893 }else if (id == PCP_HELO_OSTYPE)
1895 osType = atom.readInt();
1896 }else if (id == PCP_HELO_PORT)
1898 rhost.port = atom.readShort();
1899 }else if (id == PCP_HELO_PING)
1901 pingPort = atom.readShort();
1904 LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1911 LOG_DEBUG("Incoming PCP is %s : v%d", agent.cstr(),version);
1914 if (!rhost.globalIP() && servMgr->serverHost.globalIP())
1915 rhost.ip = servMgr->serverHost.ip;
1920 rhost.toStr(ripStr);
1921 LOG_DEBUG("Incoming firewalled test request: %s ", ripStr);
1922 rhost.port = pingPort;
1923 if (!rhost.globalIP() || !pingHost(rhost,rid))
1927 if (servMgr->isRoot)
1931 if (bcID.getFlags() & 1) // private
1933 BCID *bcid = servMgr->findValidBCID(bcID);
1934 if (!bcid || (bcid && !bcid->valid))
1936 atom.writeParent(PCP_OLEH,1);
1937 atom.writeInt(PCP_HELO_DISABLE,1);
1938 throw StreamException("Client is banned");
1946 MemoryStream mem(tbuf, sizeof(tbuf));
1947 AtomStream atom2(mem);
1948 atom2.writeParent(PCP_OLEH,5);
1949 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1950 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1951 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1952 atom2.writeInt(PCP_HELO_REMOTEIP,rhost.ip);
1953 atom2.writeShort(PCP_HELO_PORT,rhost.port);
1957 if (version < PCP_CLIENT_MINVERSION)
1959 atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADAGENT);
1960 atom.io.write(tbuf, mem.getPosition());
1961 throw StreamException("Agent is not valid");
1967 atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1968 atom.io.write(tbuf, mem.getPosition());
1969 throw StreamException("Remote host not identified");
1974 if (servMgr->isRoot)
1976 servMgr->writeRootAtoms(atom2,false);
1979 atom.io.write(tbuf, mem.getPosition());
1981 LOG_DEBUG("PCP Incoming handshake complete.");
1985 // -----------------------------------
1986 void Servent::processIncomingPCP(bool suggestOthers)
1988 PCPStream::readVersion(*sock);
1991 AtomStream atom(*sock);
1992 Host rhost = sock->host;
1994 handshakeIncomingPCP(atom,rhost,remoteID,agent);
1997 bool alreadyConnected = (servMgr->findConnection(Servent::T_COUT,remoteID)!=NULL)
1998 || (servMgr->findConnection(Servent::T_CIN,remoteID)!=NULL);
1999 bool unavailable = servMgr->controlInFull();
2000 bool offair = !servMgr->isRoot && !chanMgr->isBroadcasting();
2005 if (unavailable || alreadyConnected || offair)
2009 if (alreadyConnected)
2010 error = PCP_ERROR_QUIT+PCP_ERROR_ALREADYCONNECTED;
2011 else if (unavailable)
2012 error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
2014 error = PCP_ERROR_QUIT+PCP_ERROR_OFFAIR;
2016 error = PCP_ERROR_QUIT;
2026 for(int i=0; i<8; i++)
2030 // find best hit on this network
2031 if (!rhost.globalIP())
2034 chs.matchHost = servMgr->serverHost;
2036 chs.excludeID = remoteID;
2037 chs.trackersOnly = true;
2038 chs.useBusyControls = false;
2039 if (chanMgr->pickHits(chs))
2043 // find best hit on same network
2047 chs.matchHost = rhost;
2049 chs.excludeID = remoteID;
2050 chs.trackersOnly = true;
2051 chs.useBusyControls = false;
2052 if (chanMgr->pickHits(chs))
2056 // else find best hit on other networks
2061 chs.excludeID = remoteID;
2062 chs.trackersOnly = true;
2063 chs.useBusyControls = false;
2064 if (chanMgr->pickHits(chs))
2073 best.writeAtoms(atom,noID);
2078 LOG_DEBUG("Sent %d tracker(s) to %s",cnt,rstr);
2080 else if (rhost.port)
2082 // send push request to best firewalled tracker on other network
2085 chs.excludeID = remoteID;
2086 chs.trackersOnly = true;
2087 chs.useFirewalled = true;
2088 chs.useBusyControls = false;
2089 if (chanMgr->pickHits(chs))
2094 int cnt = servMgr->broadcastPushRequest(best,rhost,noID,Servent::T_CIN);
2095 LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,rstr);
2099 LOG_DEBUG("No available trackers");
2104 LOG_ERROR("Sending QUIT to incoming: %d",error);
2106 atom.writeInt(PCP_QUIT,error);
2112 setStatus(S_CONNECTED);
2114 atom.writeInt(PCP_OK,0);
2117 atom.writeParent(PCP_ROOT,1);
2118 atom.writeParent(PCP_ROOT_UPDATE,0);
2120 pcpStream = new PCPStream(remoteID);
2124 while (!error && thread.active && !sock->eof())
2126 error = pcpStream->readPacket(*sock,bcs);
2129 if (!servMgr->isRoot && !chanMgr->isBroadcasting())
2130 error = PCP_ERROR_OFFAIR;
2131 if (peercastInst->isQuitting)
2132 error = PCP_ERROR_SHUTDOWN;
2135 pcpStream->flush(*sock);
2137 error += PCP_ERROR_QUIT;
2138 atom.writeInt(PCP_QUIT,error);
2140 LOG_DEBUG("PCP Incoming to %s closed: %d",rstr,error);
2144 // -----------------------------------
2145 int Servent::outgoingProcMain(ThreadInfo *thread)
2148 LOG_DEBUG("COUT started");
2150 Servent *sv = (Servent*)thread->data;
2154 sv->pcpStream = new PCPStream(noID);
2156 while (sv->thread.active)
2158 sv->setStatus(S_WAIT);
2160 if (chanMgr->isBroadcasting() && servMgr->autoServe)
2170 if (servMgr->rootHost.isEmpty())
2175 sv->sock = sv->pushSock;
2176 sv->pushSock = NULL;
2177 bestHit.host = sv->sock->host;
2183 ChanHitList *chl = chanMgr->findHitListByID(noID);
2186 // find local tracker
2188 chs.matchHost = servMgr->serverHost;
2189 chs.waitDelay = MIN_TRACKER_RETRY;
2190 chs.excludeID = servMgr->sessionID;
2191 chs.trackersOnly = true;
2192 if (!chl->pickHits(chs))
2194 // else find global tracker
2196 chs.waitDelay = MIN_TRACKER_RETRY;
2197 chs.excludeID = servMgr->sessionID;
2198 chs.trackersOnly = true;
2204 bestHit = chs.best[0];
2209 unsigned int ctime = sys->getTime();
2211 if ((!bestHit.host.ip) && ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY))
2213 bestHit.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
2215 chanMgr->lastYPConnect = ctime;
2219 }while (!bestHit.host.ip && (sv->thread.active));
2222 if (!bestHit.host.ip) // give up
2224 LOG_ERROR("COUT giving up");
2229 bestHit.host.toStr(ipStr);
2235 LOG_DEBUG("COUT to %s: Connecting..",ipStr);
2239 sv->setStatus(S_CONNECTING);
2240 sv->sock = sys->createSocket();
2242 throw StreamException("Unable to create socket");
2243 sv->sock->open(bestHit.host);
2244 sv->sock->connect();
2248 sv->sock->setReadTimeout(30000);
2249 AtomStream atom(*sv->sock);
2251 sv->setStatus(S_HANDSHAKE);
2253 Host rhost = sv->sock->host;
2254 atom.writeInt(PCP_CONNECT,1);
2255 handshakeOutgoingPCP(atom,rhost,sv->remoteID,sv->agent,bestHit.yp);
2257 sv->setStatus(S_CONNECTED);
2259 LOG_DEBUG("COUT to %s: OK",ipStr);
2261 sv->pcpStream->init(sv->remoteID);
2264 bcs.servent_id = sv->servent_id;
2266 while (!error && sv->thread.active && !sv->sock->eof() && servMgr->autoServe)
2268 error = sv->pcpStream->readPacket(*sv->sock,bcs);
2272 if (!chanMgr->isBroadcasting())
2273 error = PCP_ERROR_OFFAIR;
2274 if (peercastInst->isQuitting)
2275 error = PCP_ERROR_SHUTDOWN;
2277 if (sv->pcpStream->nextRootPacket)
2278 if (sys->getTime() > (sv->pcpStream->nextRootPacket+30))
2279 error = PCP_ERROR_NOROOT;
2281 sv->setStatus(S_CLOSING);
2283 sv->pcpStream->flush(*sv->sock);
2285 error += PCP_ERROR_QUIT;
2286 atom.writeInt(PCP_QUIT,error);
2288 LOG_ERROR("COUT to %s closed: %d",ipStr,error);
2290 }catch(TimeoutException &e)
2292 LOG_ERROR("COUT to %s: timeout (%s)",ipStr,e.msg);
2293 sv->setStatus(S_TIMEOUT);
2294 }catch(StreamException &e)
2296 LOG_ERROR("COUT to %s: %s",ipStr,e.msg);
2297 sv->setStatus(S_ERROR);
2309 }catch(StreamException &) {}
2311 // don`t discard this hit if we caused the disconnect (stopped broadcasting)
2312 if (error != (PCP_ERROR_QUIT+PCP_ERROR_OFFAIR))
2313 chanMgr->deadHit(bestHit);
2321 sys->endThread(thread);
2322 LOG_DEBUG("COUT ended");
2325 // -----------------------------------
2326 int Servent::outgoingProc(ThreadInfo *thread)
2328 SEH_THREAD(outgoingProcMain, Servent::outgoingProc);
2330 // -----------------------------------
2331 int Servent::incomingProcMain(ThreadInfo *thread)
2335 Servent *sv = (Servent*)thread->data;
2338 sv->sock->host.toStr(ipStr);
2342 sv->handshakeIncoming();
2343 }catch(HTTPException &e)
2347 sv->sock->writeLine(e.msg);
2349 sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\"");
2350 sv->sock->writeLine("");
2351 }catch(StreamException &){}
2352 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2353 }catch(StreamException &e)
2355 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2360 sys->endThread(thread);
2363 // -----------------------------------
2364 int Servent::incomingProc(ThreadInfo *thread)
2366 SEH_THREAD(incomingProcMain, Servent::incomingProc);
2368 // -----------------------------------
2369 void Servent::processServent()
2371 setStatus(S_HANDSHAKE);
2376 throw StreamException("Servent has no socket");
2381 // -----------------------------------
2382 void Servent::processStream(bool doneHandshake,ChanInfo &chanInfo)
2386 setStatus(S_HANDSHAKE);
2388 if (!handshakeStream(chanInfo))
2392 if (chanInfo.id.isSet())
2395 chanID = chanInfo.id;
2397 LOG_CHANNEL("Sending channel: %s ",ChanInfo::getProtocolStr(outputProtocol));
2399 if (!waitForChannelHeader(chanInfo))
2400 throw StreamException("Channel not ready");
2402 servMgr->totalStreams++;
2404 Host host = sock->host;
2405 host.port = 0; // force to 0 so we ignore the incoming port
2407 Channel *ch = chanMgr->findChannelByID(chanID);
2409 throw StreamException("Channel not found");
2411 if (outputProtocol == ChanInfo::SP_HTTP)
2413 if ((addMetadata) && (chanMgr->icyMetaInterval))
2414 sendRawMetaChannel(chanMgr->icyMetaInterval);
2416 sendRawChannel(true,true);
2418 }else if (outputProtocol == ChanInfo::SP_MMS)
2422 sendRawChannel(true,true);
2425 sendRawChannel(true,false);
2428 }else if (outputProtocol == ChanInfo::SP_PCP)
2432 } else if (outputProtocol == ChanInfo::SP_PEERCAST)
2434 sendPeercastChannel();
2438 setStatus(S_CLOSING);
2441 // -----------------------------------------
2445 file.openReadOnly("c://test.mp3");
2447 LOG_DEBUG("raw file read");
2452 LOG_DEBUG("send %d",cnt++);
2453 file.read(buf,sizeof(buf));
2454 sock->write(buf,sizeof(buf));
2458 LOG_DEBUG("raw file sent");
2463 // -----------------------------------
2464 bool Servent::waitForChannelHeader(ChanInfo &info)
2466 for(int i=0; i<30*10; i++)
2468 Channel *ch = chanMgr->findChannelByID(info.id);
2472 if (ch->isPlaying() && (ch->rawData.writePos>0))
2475 if (!thread.active || !sock->active())
2481 // -----------------------------------
2482 void Servent::sendRawChannel(bool sendHead, bool sendData)
2487 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2489 Channel *ch = chanMgr->findChannelByID(chanID);
2491 throw StreamException("Channel not found");
2493 setStatus(S_CONNECTED);
2495 LOG_DEBUG("Starting Raw stream of %s at %d",ch->info.name.cstr(),streamPos);
2499 ch->headPack.writeRaw(*sock);
2500 streamPos = ch->headPack.pos + ch->headPack.len;
2501 LOG_DEBUG("Sent %d bytes header ",ch->headPack.len);
2507 unsigned int streamIndex = ch->streamIndex;
2508 unsigned int connectTime = sys->getTime();
2509 unsigned int lastWriteTime = connectTime;
2511 while ((thread.active) && sock->active())
2513 ch = chanMgr->findChannelByID(chanID);
2518 if (streamIndex != ch->streamIndex)
2520 streamIndex = ch->streamIndex;
2521 streamPos = ch->headPack.pos;
2522 LOG_DEBUG("sendRaw got new stream index");
2526 if (ch->rawData.findPacket(streamPos,rawPack))
2528 if (syncPos != rawPack.sync)
2529 LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2530 syncPos = rawPack.sync+1;
2532 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2534 rawPack.writeRaw(*sock);
2535 lastWriteTime = sys->getTime();
2538 if (rawPack.pos < streamPos)
2539 LOG_DEBUG("raw: skip back %d",rawPack.pos-streamPos);
2540 streamPos = rawPack.pos+rawPack.len;
2541 } else if (sock->readReady()) {
2543 int error = sock->readUpto(&c, 1);
2544 if (error == 0) sock->close();
2548 if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2549 throw TimeoutException();
2554 }catch(StreamException &e)
2556 LOG_ERROR("Stream channel: %s",e.msg);
2561 // -----------------------------------
2562 void Servent::sendRawMultiChannel(bool sendHead, bool sendData)
2566 unsigned int chanStreamIndex[ChanMgr::MAX_CHANNELS];
2567 unsigned int chanStreamPos[ChanMgr::MAX_CHANNELS];
2568 GnuID chanIDs[ChanMgr::MAX_CHANNELS];
2570 for(int i=0; i<ChanMgr::MAX_CHANNELS; i++)
2572 Channel *ch = &chanMgr->channels[i];
2573 if (ch->isPlaying())
2574 chanIDs[numChanIDs++]=ch->info.id;
2579 setStatus(S_CONNECTED);
2584 for(int i=0; i<numChanIDs; i++)
2586 Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2589 LOG_DEBUG("Starting RawMulti stream: %s",ch->info.name.cstr());
2590 ch->headPack.writeRaw(*sock);
2591 chanStreamPos[i] = ch->headPack.pos + ch->headPack.len;
2592 chanStreamIndex[i] = ch->streamIndex;
2593 LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2602 unsigned int connectTime=sys->getTime();
2604 while ((thread.active) && sock->active())
2607 for(int i=1; i<numChanIDs; i++)
2609 Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2612 if (chanStreamIndex[i] != ch->streamIndex)
2614 chanStreamIndex[i] = ch->streamIndex;
2615 chanStreamPos[i] = ch->headPack.pos;
2616 LOG_DEBUG("sendRawMulti got new stream index for chan %d",i);
2620 if (ch->rawData.findPacket(chanStreamPos[i],rawPack))
2622 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2623 rawPack.writeRaw(*sock);
2626 if (rawPack.pos < chanStreamPos[i])
2627 LOG_DEBUG("raw: skip back %d",rawPack.pos-chanStreamPos[i]);
2628 chanStreamPos[i] = rawPack.pos+rawPack.len;
2631 //LOG("raw at %d: %d %d",streamPos,ch->rawData.getStreamPos(ch->rawData.firstPos),ch->rawData.getStreamPos(ch->rawData.lastPos));
2641 }catch(StreamException &e)
2643 LOG_ERROR("Stream channel: %s",e.msg);
2648 // -----------------------------------
2649 void Servent::sendRawMetaChannel(int interval)
2654 Channel *ch = chanMgr->findChannelByID(chanID);
2656 throw StreamException("Channel not found");
2658 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2660 setStatus(S_CONNECTED);
2662 LOG_DEBUG("Starting Raw Meta stream of %s (metaint: %d) at %d",ch->info.name.cstr(),interval,streamPos);
2665 String lastTitle,lastURL;
2667 int lastMsgTime=sys->getTime();
2673 if ((interval > sizeof(buf)) || (interval < 1))
2674 throw StreamException("Bad ICY Meta Interval value");
2676 unsigned int connectTime = sys->getTime();
2677 unsigned int lastWriteTime = connectTime;
2679 streamPos = 0; // raw meta channel has no header (its MP3)
2681 while ((thread.active) && sock->active())
2683 ch = chanMgr->findChannelByID(chanID);
2689 if (ch->rawData.findPacket(streamPos,rawPack))
2692 if (syncPos != rawPack.sync)
2693 LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2694 syncPos = rawPack.sync+1;
2696 MemoryStream mem(rawPack.data,rawPack.len);
2698 if (rawPack.type == ChanPacket::T_DATA)
2701 int len = rawPack.len;
2702 char *p = rawPack.data;
2706 if ((bufPos+rl) > interval)
2707 rl = interval-bufPos;
2708 memcpy(&buf[bufPos],p,rl);
2713 if (bufPos >= interval)
2716 sock->write(buf,interval);
2717 lastWriteTime = sys->getTime();
2719 if (chanMgr->broadcastMsgInterval)
2720 if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval)
2723 lastMsgTime = sys->getTime();
2726 String *metaTitle = &ch->info.track.title;
2727 if (!ch->info.comment.isEmpty() && (showMsg))
2728 metaTitle = &ch->info.comment;
2731 if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL))
2740 title.convertTo(String::T_META);
2741 url.convertTo(String::T_META);
2743 sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr());
2744 int len = ((strlen(tmp) + 15+1) / 16);
2745 sock->writeChar(len);
2746 sock->write(tmp,len*16);
2748 lastTitle = *metaTitle;
2749 lastURL = ch->info.url;
2751 LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr());
2761 streamPos = rawPack.pos + rawPack.len;
2764 if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2765 throw TimeoutException();
2770 }catch(StreamException &e)
2772 LOG_ERROR("Stream channel: %s",e.msg);
2775 // -----------------------------------
2776 void Servent::sendPeercastChannel()
2780 setStatus(S_CONNECTED);
2782 Channel *ch = chanMgr->findChannelByID(chanID);
2784 throw StreamException("Channel not found");
2786 LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr());
2788 sock->writeTag("PCST");
2792 ch->headPack.writePeercast(*sock);
2794 pack.init(ChanPacket::T_META,ch->insertMeta.data,ch->insertMeta.len,ch->streamPos);
2795 pack.writePeercast(*sock);
2798 unsigned int syncPos=0;
2799 while ((thread.active) && sock->active())
2801 ch = chanMgr->findChannelByID(chanID);
2806 if (ch->rawData.findPacket(streamPos,rawPack))
2808 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2810 sock->writeTag("SYNC");
2811 sock->writeShort(4);
2812 sock->writeShort(0);
2813 sock->write(&syncPos,4);
2816 rawPack.writePeercast(*sock);
2818 streamPos = rawPack.pos + rawPack.len;
2824 }catch(StreamException &e)
2826 LOG_ERROR("Stream channel: %s",e.msg);
2830 //WLock canStreamLock;
2832 // -----------------------------------
2833 void Servent::sendPCPChannel()
2835 bool skipCheck = false;
2836 unsigned int ptime = 0;
2837 int npacket = 0, upsize = 0;
2839 Channel *ch = chanMgr->findChannelByID(chanID);
2841 throw StreamException("Channel not found");
2843 AtomStream atom(*sock);
2845 pcpStream = new PCPStream(remoteID);
2851 LOG_DEBUG("Starting PCP stream of channel at %d",streamPos);
2854 // setStatus(S_CONNECTED);
2856 //canStreamLock.on();
2857 //thread.active = canStream(ch);
2858 //setStatus(S_CONNECTED);
2859 //canStreamLock.off();
2866 atom.writeParent(PCP_CHAN,3 + ((sendHeader)?1:0));
2867 atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
2868 ch->info.writeInfoAtoms(atom);
2869 ch->info.writeTrackAtoms(atom);
2872 atom.writeParent(PCP_CHAN_PKT,3);
2873 atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2874 atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
2875 atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
2878 streamPos = ch->headPack.pos+ch->headPack.len;
2879 LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2883 unsigned int streamIndex = ch->streamIndex;
2886 char pbuf[ChanPacket::MAX_DATALEN*3];
2887 MemoryStream mems(pbuf,sizeof(pbuf));
2888 AtomStream atom2(mems);
2890 while (thread.active)
2893 Channel *ch = chanMgr->findChannelByID(chanID);
2898 if (streamIndex != ch->streamIndex)
2900 streamIndex = ch->streamIndex;
2901 streamPos = ch->headPack.pos;
2902 LOG_DEBUG("sendPCPStream got new stream index");
2907 if (ch->rawData.findPacket(streamPos,rawPack))
2909 if ((streamPos < rawPack.pos) && !rawPack.skip){
2912 getHost().IPtoStr(tmp);
2913 LOG_NETWORK("##### send skipping ##### %d (%d, %d) -> %s", (rawPack.pos - streamPos), streamPos, rawPack.pos, tmp);
2915 if (sys->getTime() == lastSkipTime) {
2916 LOG_DEBUG("##### skip all buffer");
2917 streamPos = ch->rawData.getLatestPos();
2921 lastSkipTime = sys->getTime();
2928 if (rawPack.type == ChanPacket::T_HEAD)
2930 atom2.writeParent(PCP_CHAN,2);
2931 atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2932 atom2.writeParent(PCP_CHAN_PKT,3);
2933 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2934 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2935 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2937 sock->write(pbuf, mems.getPosition());
2938 }else if (rawPack.type == ChanPacket::T_DATA)
2940 atom2.writeParent(PCP_CHAN,2);
2941 atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2942 atom2.writeParent(PCP_CHAN_PKT,3);
2943 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_DATA);
2944 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2945 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2948 sock->bufferingWrite(pbuf, mems.getPosition());
2949 lastSkipTime = sock->bufList.lastSkipTime;
2950 lastSkipCount = sock->bufList.skipCount;
2952 sock->write(pbuf, mems.getPosition());
2956 if (rawPack.pos < streamPos)
2957 LOG_DEBUG("pcp: skip back %d",rawPack.pos-streamPos);
2959 //LOG_DEBUG("Sending %d-%d (%d,%d,%d)",rawPack.pos,rawPack.pos+rawPack.len,ch->streamPos,ch->rawData.getLatestPos(),ch->rawData.getOldestPos());
2961 streamPos = rawPack.pos+rawPack.len;
2964 throw StreamException("Channel not found");
2968 sock->bufferingWrite(NULL, 0);
2969 lastSkipTime = sock->bufList.lastSkipTime;
2970 lastSkipCount = sock->bufList.skipCount;
2973 bcs.servent_id = servent_id;
2974 // error = pcpStream->readPacket(*sock,bcs);
2976 unsigned int t = sys->getTime();
2979 npacket = MAX_PROC_PACKETS;
2980 upsize = MAX_OUTWARD_SIZE;
2983 int len = pcpStream->flushUb(*sock, upsize);
2986 while (npacket > 0 && sock->readReady()) {
2988 error = pcpStream->readPacket(*sock,bcs);
2990 throw StreamException("PCP exception");
2997 LOG_DEBUG("PCP channel stream closed normally.");
2999 }catch(StreamException &e)
3001 LOG_ERROR("Stream channel: %s",e.msg);
3006 pcpStream->flush(*sock);
3007 atom.writeInt(PCP_QUIT,error);
3008 }catch(StreamException &) {}
3012 // -----------------------------------
3013 int Servent::serverProcMain(ThreadInfo *thread)
3018 Servent *sv = (Servent*)thread->data;
3023 throw StreamException("Server has no socket");
3025 sv->setStatus(S_LISTENING);
3029 sv->sock->host.toStr(servIP);
3031 if (servMgr->isRoot)
3032 LOG_DEBUG("Root Server started: %s",servIP);
3034 LOG_DEBUG("Server started: %s",servIP);
3037 while ((thread->active) && (sv->sock->active()))
3039 if (servMgr->numActiveOnPort(sv->sock->host.port) < servMgr->maxServIn)
3041 ClientSocket *cs = sv->sock->accept();
3043 //
\95s
\90³
\82È
\83\
\81[
\83X
\83A
\83h
\83\8c\83X(IPv4
\83}
\83\8b\83`
\83L
\83\83\83X
\83g)
\82ð
\8f\9c\8aO
3044 if (cs && (((cs->host.ip >> 24) & 0xF0) == 0xE0))
3049 LOG_ERROR("reject incoming multicast address: %s", ip);
3050 peercastApp->notifyMessage(ServMgr::NT_PEERCAST, "reject multicast address");
3054 // countermeasure against DoS Atk
3055 if (cs->host.ip != (0x7F000001)) // bypass loopback
3058 addrCont clientAddr(cs->host.ip);
3059 servMgr->IP_blacklist->lock();
3060 if (servMgr->IP_blacklist->find(clientAddr))
3063 servMgr->IP_blacklist->unlock();
3065 LOG_DEBUG("REFUSED: %d.%d.%d.%d", (cs->host.ip >> 24), (cs->host.ip >> 16) & 0xFF, (cs->host.ip >> 8) & 0xFF, cs->host.ip & 0xFF);
3072 servMgr->IP_blacklist->unlock();
3073 LOG_DEBUG("ACCEPT: %d.%d.%d.%d", (cs->host.ip >> 24), (cs->host.ip >> 16) & 0xFF, (cs->host.ip >> 8) & 0xFF, cs->host.ip & 0xFF);
3077 servMgr->IP_graylist->lock();
3079 if (servMgr->IP_graylist->find(clientAddr, &idx))
3082 ++(servMgr->IP_graylist->at(idx));
3083 LOG_DEBUG("UPDATE: %d.%d.%d.%d", (cs->host.ip >> 24), (cs->host.ip >> 16) & 0xFF, (cs->host.ip >> 8) & 0xFF, cs->host.ip & 0xFF);
3087 servMgr->IP_graylist->push_back(clientAddr);
3088 LOG_DEBUG("GRAYED: %d.%d.%d.%d", (cs->host.ip >> 24), (cs->host.ip >> 16) & 0xFF, (cs->host.ip >> 8) & 0xFF, cs->host.ip & 0xFF);
3090 servMgr->IP_graylist->unlock();
3093 LOG_DEBUG("accepted incoming");
3094 Servent *ns = servMgr->allocServent();
3097 servMgr->lastIncoming = sys->getTime();
3098 ns->servPort = sv->sock->host.port;
3099 ns->networkID = servMgr->networkID;
3100 ns->initIncoming(cs,sv->allow);
3102 LOG_ERROR("Out of servents");
3107 }catch(StreamException &e)
3109 LOG_ERROR("Server Error: %s:%d",e.msg,e.err);
3113 LOG_DEBUG("Server stopped");
3116 sys->endThread(thread);
3120 // -----------------------------------
3121 int Servent::serverProc(ThreadInfo *thread)
3123 SEH_THREAD(serverProcMain, Servent::serverProc);
3126 // -----------------------------------
3127 bool Servent::writeVariable(Stream &s, const String &var)
3132 strcpy(buf,getTypeStr());
3133 else if (var == "status")
3134 strcpy(buf,getStatusStr());
3135 else if (var == "address")
3137 if (servMgr->enableGetName) //JP-EX s
3139 getHost().toStr(buf);
3144 /* ChanHitList *hits[ChanMgr::MAX_HITLISTS];
3146 for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
3148 ChanHitList *chl = &chanMgr->hitlists[i];
3150 hits[numHits++] = chl;
3153 ishit = isfw = false;
3157 for(int k=0; k<numHits; k++)
3159 ChanHitList *chl = hits[k];
3162 for (int j=0; j<ChanHitList::MAX_HITS; j++)
3164 ChanHit *hit = &chl->hits[j];
3165 if (hit->host.isValid() && (h.ip == hit->host.ip))
3168 if (hit->firewalled)
3170 numRelay += hit->numRelays;
3182 strcat(buf,"<font color=red>");
3184 strcat(buf,"<font color=orange>");
3187 strcat(buf,"<font color=green>");
3191 if (ClientSocket::getHostname(h_name,h.ip))
3199 strcat(buf,"</font>");
3205 bool isRelay = true;
3207 ChanHitList *chl = chanMgr->findHitListByID(chanID);
3209 ChanHit *hit = chl->hit;
3211 if (hit->host.isValid() && (h.ip == hit->host.ip)){
3212 isfw = hit->firewalled;
3213 isRelay = hit->relay;
3214 numRelay = hit->numRelays;
3223 strcat(buf,"<font color=red>");
3225 strcat(buf,"<font color=orange>");
3230 strcpy(buf,"<font color=purple>");
3232 strcpy(buf,"<font color=blue>");
3235 strcpy(buf,"<font color=green>");
3240 if (ClientSocket::getHostname(h_name,sizeof(h_name),h.ip)) //JP-MOD(BOF
\91Î
\8dô)
3246 strcat(buf,"</font>");
3249 getHost().toStr(buf);
3251 else if (var == "agent")
3252 strcpy(buf,agent.cstr());
3253 else if (var == "bitrate")
3257 unsigned int tot = sock->bytesInPerSec+sock->bytesOutPerSec;
3258 sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
3261 }else if (var == "uptime")
3265 uptime.setFromStopwatch(sys->getTime()-lastConnect);
3268 strcpy(buf,uptime.cstr());
3269 }else if (var.startsWith("gnet."))
3272 float ctime = (float)(sys->getTime()-lastConnect);
3273 if (var == "gnet.packetsIn")
3274 sprintf(buf,"%d",gnuStream.packetsIn);
3275 else if (var == "gnet.packetsInPerSec")
3276 sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsIn)/ctime:0);
3277 else if (var == "gnet.packetsOut")
3278 sprintf(buf,"%d",gnuStream.packetsOut);
3279 else if (var == "gnet.packetsOutPerSec")
3280 sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsOut)/ctime:0);
3281 else if (var == "gnet.normQueue")
3282 sprintf(buf,"%d",outPacketsNorm.numPending());
3283 else if (var == "gnet.priQueue")
3284 sprintf(buf,"%d",outPacketsPri.numPending());
3285 else if (var == "gnet.flowControl")
3286 sprintf(buf,"%d",flowControl?1:0);
3287 else if (var == "gnet.routeTime")
3289 int nr = seenIDs.numUsed();
3290 unsigned int tim = sys->getTime()-seenIDs.getOldest();
3293 tstr.setFromStopwatch(tim);
3296 strcpy(buf,tstr.cstr());