1 // ------------------------------------------------
6 // Channel streaming classes. These do the actual
7 // streaming of media between clients.
9 // (c) 2002 peercast.org
11 // ------------------------------------------------
12 // This program is free software; you can redistribute it and/or modify
13 // it under the terms of the GNU General Public License as published by
14 // the Free Software Foundation; either version 2 of the License, or
15 // (at your option) any later version.
17 // This program is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 // GNU General Public License for more details.
21 // ------------------------------------------------
48 #include "chkMemoryLeak.h"
49 #define DEBUG_NEW new(__FILE__, __LINE__)
53 // -----------------------------------
54 char *Channel::srcTypes[]=
62 // -----------------------------------
63 char *Channel::statusMsgs[]=
82 bool isIndexTxt(ChanInfo *info)
87 info->contentType == ChanInfo::T_RAW &&
88 info->bitrate <= 32 &&
89 (len = strlen(info->name.cstr())) >= 9 &&
90 !memcmp(info->name.cstr(), "index", 5) &&
91 !memcmp(info->name.cstr()+len-4, ".txt", 4))
101 bool isIndexTxt(Channel *ch)
103 if(ch && !ch->isBroadcasting() && isIndexTxt(&ch->info))
109 int numMaxRelaysIndexTxt(Channel *ch)
111 return ((servMgr->maxRelaysIndexTxt < 1) ? 1 : servMgr->maxRelaysIndexTxt);
114 int canStreamIndexTxt(Channel *ch)
118 //
\8e©
\95ª
\82ª
\94z
\90M
\82µ
\82Ä
\82¢
\82é
\8fê
\8d\87\82Í
\8aÖ
\8cW
\82È
\82¢
119 if(!ch || ch->isBroadcasting())
122 ret = numMaxRelaysIndexTxt(ch) - ch->localRelays();
130 // -----------------------------------
131 void readXMLString(String &str, XML::Node *n, const char *arg)
134 p = n->findAttr(arg);
137 str.set(p,String::T_HTML);
138 str.convertTo(String::T_ASCII);
143 // -----------------------------------------------------------------------------
144 // Initialise the channel to its default settings of unallocated and reset.
145 // -----------------------------------------------------------------------------
150 channel_id = channel_count++;
152 // -----------------------------------------------------------------------------
153 void Channel::endThread(bool flg)
177 chanMgr->channellock.on();
178 chanMgr->deleteChannel(this);
179 chanMgr->channellock.off();
181 sys->endThread(&thread);
184 // -----------------------------------------------------------------------------
185 void Channel::resetPlayTime()
187 info.lastPlayStart = sys->getTime();
189 // -----------------------------------------------------------------------------
190 void Channel::setStatus(STATUS s)
194 bool wasPlaying = isPlaying();
200 info.status = ChanInfo::S_PLAY;
205 info.lastPlayEnd = sys->getTime();
206 info.status = ChanInfo::S_UNKNOWN;
209 if (isBroadcasting())
211 ChanHitList *chl = chanMgr->findHitListByID(info.id);
213 chanMgr->addHitList(info);
216 peercastApp->channelUpdate(&info);
221 // -----------------------------------------------------------------------------
222 // Reset channel and make it available
223 // -----------------------------------------------------------------------------
224 void Channel::reset()
237 stayConnected = false;
241 skipCount = 0; //JP-EX
251 rawData.accept = ChanPacket::T_HEAD|ChanPacket::T_DATA;
263 lastTrackerUpdate = 0;
278 // -----------------------------------
279 void Channel::newPacket(ChanPacket &pack)
281 if (pack.type != ChanPacket::T_PCP)
283 rawData.writePacket(pack,true);
288 // -----------------------------------
289 bool Channel::checkIdle()
291 return ( (info.getUptime() > chanMgr->prefetchTime) && (localListeners() == 0) && (!stayConnected) && (status != S_BROADCASTING));
294 // -----------------------------------
295 bool Channel::isFull()
297 // for PCRaw (relay) start.
300 int ret = canStreamIndexTxt(this);
307 // for PCRaw (relay) end.
309 return chanMgr->maxRelaysPerChannel ? localRelays() >= chanMgr->maxRelaysPerChannel : false;
311 // -----------------------------------
312 int Channel::localRelays()
314 return servMgr->numStreams(info.id,Servent::T_RELAY,true);
316 // -----------------------------------
317 int Channel::localListeners()
319 return servMgr->numStreams(info.id,Servent::T_DIRECT,true);
322 // -----------------------------------
323 int Channel::totalRelays()
326 ChanHitList *chl = chanMgr->findHitListByID(info.id);
328 tot += chl->numHits();
331 // -----------------------------------
332 int Channel::totalListeners()
334 int tot = localListeners();
335 ChanHitList *chl = chanMgr->findHitListByID(info.id);
337 tot += chl->numListeners();
344 // -----------------------------------
345 void Channel::startGet()
347 srcType = SRC_PEERCAST;
349 info.srcProtocol = ChanInfo::SP_PCP;
352 sourceData = new PeercastSource();
356 // -----------------------------------
357 void Channel::startURL(const char *u)
363 stayConnected = true;
367 sourceData = new URLSource(u);
372 // -----------------------------------
373 void Channel::startStream()
376 thread.func = stream;
377 if (!sys->startThread(&thread))
381 // -----------------------------------
382 void Channel::sleepUntil(double time)
384 double sleepTime = time - (sys->getDTime()-startTime);
386 // LOG("sleep %g",sleepTime);
389 if (sleepTime > 60) sleepTime = 60;
391 double sleepMS = sleepTime*1000;
393 sys->sleep((int)sleepMS);
398 // -----------------------------------
399 void Channel::checkReadDelay(unsigned int len)
403 unsigned int time = (len*1000)/((info.bitrate*1024)/8);
411 // -----------------------------------
412 THREAD_PROC Channel::stream(ThreadInfo *thread)
416 Channel *ch = (Channel *)thread->data;
418 LOG_CHANNEL("Channel started");
420 while (thread->active && !peercastInst->isQuitting && !thread->finish)
422 ChanHitList *chl = chanMgr->findHitList(ch->info);
424 chanMgr->addHitList(ch->info);
426 ch->sourceData->stream(ch);
428 LOG_CHANNEL("Channel stopped");
431 if (!ch->stayConnected)
433 thread->active = false;
437 if (!ch->info.lastPlayEnd)
438 ch->info.lastPlayEnd = sys->getTime();
440 unsigned int diff = (sys->getTime()-ch->info.lastPlayEnd) + 5;
442 LOG_DEBUG("Channel sleeping for %d seconds",diff);
443 for(unsigned int i=0; i<diff; i++)
445 if (ch->info.lastPlayEnd == 0) // reconnected
447 if (!thread->active || peercastInst->isQuitting){
448 thread->active = false;
456 LOG_DEBUG("thread.active = %d, thread.finish = %d",
457 ch->thread.active, ch->thread.finish);
459 if (!thread->finish){
460 ch->endThread(false);
463 ch->finthread = new ThreadInfo();
464 ch->finthread->func = waitFinish;
465 ch->finthread->data = ch;
466 sys->startThread(ch->finthread);
474 // -----------------------------------
475 THREAD_PROC Channel::waitFinish(ThreadInfo *thread)
477 Channel *ch = (Channel*)thread->data;
478 LOG_DEBUG("Wait channel finish");
480 while(!(ch->thread.finish) && !thread->finish){
484 if (ch->thread.finish){
485 LOG_DEBUG("channel finish");
488 LOG_DEBUG("channel restart");
495 // -----------------------------------
496 bool Channel::acceptGIV(ClientSocket *givSock)
505 // -----------------------------------
506 void Channel::connectFetch()
508 sock = sys->createSocket();
511 throw StreamException("Can`t create socket");
513 if (sourceHost.tracker || sourceHost.yp)
515 sock->setReadTimeout(30000);
516 sock->setWriteTimeout(30000);
517 LOG_CHANNEL("Channel using longer timeouts");
519 sock->setReadTimeout(5000);
520 sock->setWriteTimeout(5000);
523 sock->open(sourceHost.host);
528 // -----------------------------------
529 int Channel::handshakeFetch()
532 info.id.toStr(idStr);
535 servMgr->sessionID.toStr(sidStr);
537 sock->writeLineF("GET /channel/%s HTTP/1.0",idStr);
538 sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
539 sock->writeLineF("%s %d",PCX_HS_PCP,1);
540 sock->writeLineF("%s %d",PCX_HS_PORT,servMgr->serverHost.port);
546 int r = http.readResponse();
548 LOG_CHANNEL("Got response: %d",r);
550 while (http.nextHeader())
552 char *arg = http.getArgStr();
556 if (http.isHeader(PCX_HS_POS))
557 streamPos = atoi(arg);
559 Servent::readICYHeader(http, info, NULL);
561 LOG_CHANNEL("Channel fetch: %s",http.cmdLine);
564 if ((r != 200) && (r != 503))
567 if (!servMgr->keepDownstreams) {
568 if (rawData.getLatestPos() > streamPos)
572 AtomStream atom(*sock);
576 Host rhost = sock->host;
578 if (info.srcProtocol == ChanInfo::SP_PCP)
580 // don`t need PCP_CONNECT here
581 Servent::handshakeOutgoingPCP(atom,rhost,remoteID,agent,sourceHost.yp|sourceHost.tracker);
584 if (r == 503) return 503;
589 // -----------------------------------
590 void PeercastSource::stream(Channel *ch)
594 bool next_yp = false;
595 bool tracker_check = (ch->trackerHit.host.ip != 0);
598 ch->lastStopTime = 0;
601 while (ch->thread.active)
603 ch->skipCount = 0; //JP-EX
604 ch->lastSkipTime = 0;
606 ChanHitList *chl = NULL;
608 ch->sourceHost.init();
610 if (connFailCnt >= 3 && (ch->localListeners() == 0) && (!ch->stayConnected) && !ch->isBroadcasting()) {
611 ch->lastIdleTime = sys->getTime();
612 ch->setStatus(Channel::S_IDLE);
614 ch->lastSkipTime = 0;
618 if (!servMgr->keepDownstreams && !ch->bumped) {
619 ch->trackerHit.lastContact = sys->getTime() - 30 + (rand() % 30);
622 ch->setStatus(Channel::S_SEARCHING);
623 LOG_CHANNEL("Channel searching for hit..");
629 ch->sock = ch->pushSock;
631 ch->sourceHost.host = ch->sock->host;
635 chanMgr->hitlistlock.on();
637 chl = chanMgr->findHitList(ch->info);
643 if (!ch->sourceHost.host.ip){
645 chs.matchHost = servMgr->serverHost;
646 chs.waitDelay = MIN_RELAY_RETRY;
647 chs.excludeID = servMgr->sessionID;
648 if (chl->pickSourceHits(chs)){
649 ch->sourceHost = chs.best[0];
650 LOG_DEBUG("use local hit");
654 // else find global hit
655 if (!ch->sourceHost.host.ip)
658 chs.waitDelay = MIN_RELAY_RETRY;
659 chs.excludeID = servMgr->sessionID;
660 if (chl->pickSourceHits(chs)){
661 ch->sourceHost = chs.best[0];
662 LOG_DEBUG("use global hit");
667 // else find local tracker
668 if (!ch->sourceHost.host.ip)
671 chs.matchHost = servMgr->serverHost;
672 chs.waitDelay = MIN_TRACKER_RETRY;
673 chs.excludeID = servMgr->sessionID;
674 chs.trackersOnly = true;
675 if (chl->pickSourceHits(chs)){
676 ch->sourceHost = chs.best[0];
677 LOG_DEBUG("use local tracker");
681 // else find global tracker
682 if (!ch->sourceHost.host.ip)
685 chs.waitDelay = MIN_TRACKER_RETRY;
686 chs.excludeID = servMgr->sessionID;
687 chs.trackersOnly = true;
688 if (chl->pickSourceHits(chs)){
689 ch->sourceHost = chs.best[0];
690 tracker_check = true;
691 ch->trackerHit = chs.best[0];
692 LOG_DEBUG("use global tracker");
697 unsigned int ctime = sys->getTime();
698 if (!ch->sourceHost.host.ip && tracker_check && ch->trackerHit.host.ip){
699 if (ch->trackerHit.lastContact + 30 < ctime){
700 ch->sourceHost = ch->trackerHit;
701 ch->trackerHit.lastContact = ctime;
702 LOG_DEBUG("use saved tracker");
707 chanMgr->hitlistlock.off();
709 if (servMgr->keepDownstreams && ch->lastStopTime && ch->lastStopTime < sys->getTime() - 7) {
710 ch->lastStopTime = 0;
711 LOG_DEBUG("------------ disconnect all downstreams");
713 MemoryStream mem(pack.data,sizeof(pack.data));
714 AtomStream atom(mem);
715 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_OFFAIR);
717 pack.type = ChanPacket::T_PCP;
720 servMgr->broadcastPacket(pack,ch->info.id,ch->remoteID,noID,Servent::T_RELAY);
722 chanMgr->hitlistlock.on();
723 ChanHitList *hl = chanMgr->findHitList(ch->info);
725 hl->clearHits(false);
727 chanMgr->hitlistlock.off();
730 // no trackers found so contact YP
731 if (!tracker_check && !ch->sourceHost.host.ip)
734 if (servMgr->rootHost.isEmpty())
740 if ((!servMgr->rootHost2.isEmpty()) && (numYPTries > numYPTries2))
743 unsigned int ctime=sys->getTime();
744 if ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY)
746 ch->sourceHost.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
747 ch->sourceHost.yp = true;
748 chanMgr->lastYPConnect=ctime;
756 // no trackers found so contact YP2
757 if (!tracker_check && !ch->sourceHost.host.ip)
760 if (servMgr->rootHost2.isEmpty())
763 if (numYPTries2 >= 3)
766 unsigned int ctime=sys->getTime();
767 if ((ctime-chanMgr->lastYPConnect2) > MIN_YP_RETRY)
769 ch->sourceHost.host.fromStrName(servMgr->rootHost2.cstr(),DEFAULT_PORT);
770 ch->sourceHost.yp = true;
771 chanMgr->lastYPConnect2=ctime;
778 if (!tracker_check && !ch->sourceHost.host.ip && !next_yp) break;
782 }while((ch->sourceHost.host.ip==0) && (ch->thread.active));
784 if (!ch->sourceHost.host.ip)
786 LOG_ERROR("Channel giving up");
787 ch->setStatus(Channel::S_ERROR);
791 if (ch->sourceHost.yp)
793 LOG_CHANNEL("Channel contacting YP, try %d",numYPTries);
796 LOG_CHANNEL("Channel found hit");
801 if (ch->sourceHost.host.ip)
803 bool isTrusted = ch->sourceHost.tracker | ch->sourceHost.yp;
806 //if (ch->sourceHost.tracker)
807 // peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Contacting tracker, please wait...");
810 ch->sourceHost.host.toStr(ipstr);
813 if (ch->sourceHost.tracker)
815 else if (ch->sourceHost.yp)
822 ch->setStatus(Channel::S_CONNECTING);
823 ch->sourceHost.lastContact = sys->getTime();
827 LOG_CHANNEL("Channel connecting to %s %s",ipstr,type);
831 error = ch->handshakeFetch();
837 throw StreamException("Handshake error");
838 if (ch->sourceHost.tracker) connFailCnt = 0;
840 if (servMgr->autoMaxRelaySetting) //JP-EX
842 double setMaxRelays = ch->info.bitrate?servMgr->maxBitrateOut/(ch->info.bitrate*1.3):0;
843 if ((unsigned int)setMaxRelays == 0)
844 servMgr->maxRelays = 1;
845 else if ((unsigned int)setMaxRelays > servMgr->autoMaxRelaySetting)
846 servMgr->maxRelays = servMgr->autoMaxRelaySetting;
848 servMgr->maxRelays = (unsigned int)setMaxRelays;
851 ch->sourceStream = ch->createSource();
853 error = ch->readStream(*ch->sock,ch->sourceStream);
855 throw StreamException("Stream error");
857 error = 0; // no errors, closing normally.
858 // ch->setStatus(Channel::S_CLOSING);
859 ch->setStatus(Channel::S_IDLE);
861 LOG_CHANNEL("Channel closed normally");
862 }catch(StreamException &e)
864 ch->setStatus(Channel::S_ERROR);
865 LOG_ERROR("Channel to %s %s : %s",ipstr,type,e.msg);
866 if (!servMgr->allowConnectPCST) //JP-EX
868 if (ch->info.srcProtocol == ChanInfo::SP_PEERCAST)
869 ch->thread.active = false;
871 //if (!ch->sourceHost.tracker || ((error != 503) && ch->sourceHost.tracker))
872 if (!ch->sourceHost.tracker || (!got503 && ch->sourceHost.tracker))
873 chanMgr->deadHit(ch->sourceHost);
874 if (ch->sourceHost.tracker && error == -1) {
875 LOG_ERROR("can't connect to tracker");
880 unsigned int ctime = sys->getTime();
881 if (ch->rawData.lastWriteTime) {
882 ch->lastStopTime = ch->rawData.lastWriteTime;
883 if (isIndexTxt(ch) && ctime - ch->lastStopTime < 60)
884 ch->lastStopTime = ctime;
887 if (tracker_check && ch->sourceHost.tracker)
888 ch->trackerHit.lastContact = ctime - 30 + (rand() % 30);
890 // broadcast source host
891 if (!error && ch->sourceHost.host.ip) { // if closed normally
893 MemoryStream mem(pack.data,sizeof(pack.data));
894 AtomStream atom(mem);
895 ch->sourceHost.writeAtoms(atom, ch->info.id);
897 pack.type = ChanPacket::T_PCP;
900 servMgr->broadcastPacket(pack,ch->info.id,ch->remoteID,noID,Servent::T_RELAY);
901 LOG_DEBUG("stream: broadcast sourceHost");
904 // broadcast quit to any connected downstream servents
905 if (!servMgr->keepDownstreams || (ch->sourceHost.tracker && !got503) || !error) {
907 MemoryStream mem(pack.data,sizeof(pack.data));
908 AtomStream atom(mem);
909 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_OFFAIR);
911 pack.type = ChanPacket::T_PCP;
914 servMgr->broadcastPacket(pack,ch->info.id,ch->remoteID,noID,Servent::T_RELAY);
915 LOG_DEBUG("------------ broadcast quit to all downstreams");
917 chanMgr->hitlistlock.on();
918 ChanHitList *hl = chanMgr->findHitList(ch->info);
920 hl->clearHits(false);
922 chanMgr->hitlistlock.off();
926 if (ch->sourceStream)
932 ch->sourceStream->updateStatus(ch);
933 ch->sourceStream->flush(*ch->sock);
935 }catch(StreamException &)
937 ChannelStream *cs = ch->sourceStream;
938 ch->sourceStream = NULL;
952 LOG_ERROR("Channel not found");
954 if ((ch->sourceHost.yp && !next_yp) || ch->sourceHost.tracker) {
955 chanMgr->hitlistlock.on();
956 ChanHitList *hl = chanMgr->findHitList(ch->info);
960 chanMgr->hitlistlock.off();
962 if(!isIndexTxt(&ch->info)) // for PCRaw (popup)
963 peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Channel not found");
971 ch->lastIdleTime = sys->getTime();
972 ch->setStatus(Channel::S_IDLE);
973 ch->skipCount = 0; //JP-EX
974 ch->lastSkipTime = 0;
975 while ((ch->checkIdle()) && (ch->thread.active))
984 // -----------------------------------
985 void Channel::startICY(ClientSocket *cs, SRC_TYPE st)
989 cs->setReadTimeout(0); // stay connected even when theres no data coming through
991 info.srcProtocol = ChanInfo::SP_HTTP;
993 streamIndex = ++chanMgr->icyIndex;
995 sourceData = new ICYSource();
999 // -----------------------------------
1000 static char *nextMetaPart(char *str,char delim)
1013 // -----------------------------------
1014 static void copyStr(char *to,char *from,int max)
1017 while ((c=*from++) && (--max))
1024 // -----------------------------------
1025 void Channel::processMp3Metadata(char *str)
1027 ChanInfo newInfo = info;
1032 char *arg = nextMetaPart(cmd,'=');
1036 char *next = nextMetaPart(arg,';');
1038 if (strcmp(cmd,"StreamTitle")==0)
1040 newInfo.track.title.setUnquote(arg,String::T_ASCII);
1041 newInfo.track.title.convertTo(String::T_UNICODE);
1043 }else if (strcmp(cmd,"StreamUrl")==0)
1045 newInfo.track.contact.setUnquote(arg,String::T_ASCII);
1046 newInfo.track.contact.convertTo(String::T_UNICODE);
1053 updateInfo(newInfo);
1056 // -----------------------------------
1057 XML::Node *ChanHit::createXML()
1063 return new XML::Node("host ip=\"%s\" hops=\"%d\" listeners=\"%d\" relays=\"%d\" uptime=\"%d\" push=\"%d\" relay=\"%d\" direct=\"%d\" cin=\"%d\" stable=\"%d\" version=\"%d\" update=\"%d\" tracker=\"%d\"",
1075 sys->getTime()-time,
1081 // -----------------------------------
1082 XML::Node *ChanHitList::createXML(bool addHits)
1084 XML::Node *hn = new XML::Node("hits hosts=\"%d\" listeners=\"%d\" relays=\"%d\" firewalled=\"%d\" closest=\"%d\" furthest=\"%d\" newest=\"%d\"",
1091 sys->getTime()-newestHit()
1100 hn->add(h->createXML());
1108 // -----------------------------------
1109 XML::Node *Channel::createRelayXML(bool showStat)
1112 ststr = getStatusStr();
1114 if ((status == S_RECEIVING) || (status == S_BROADCASTING))
1117 ChanHitList *chl = chanMgr->findHitList(info);
1119 return new XML::Node("relay listeners=\"%d\" relays=\"%d\" hosts=\"%d\" status=\"%s\"",
1122 (chl!=NULL)?chl->numHits():0,
1127 // -----------------------------------
1128 void ChanMeta::fromXML(XML &xml)
1130 MemoryStream tout(data,MAX_DATALEN);
1135 // -----------------------------------
1136 void ChanMeta::fromMem(void *p, int l)
1141 // -----------------------------------
1142 void ChanMeta::addMem(void *p, int l)
1144 if ((len+l) <= MAX_DATALEN)
1146 memcpy(data+len,p,l);
1150 // -----------------------------------
1151 void Channel::broadcastTrackerUpdate(GnuID &svID, bool force)
1153 unsigned int ctime = sys->getTime();
1155 if (((ctime-lastTrackerUpdate) > 30) || (force))
1159 MemoryStream mem(pack.data,sizeof(pack));
1161 AtomStream atom(mem);
1165 ChanHitList *chl = chanMgr->findHitListByID(info.id);
1167 throw StreamException("Broadcast channel has no hitlist");
1169 int numListeners = totalListeners();
1170 int numRelays = totalRelays();
1172 unsigned int oldp = rawData.getOldestPos();
1173 unsigned int newp = rawData.getLatestPos();
1175 hit.initLocal(numListeners,numRelays,info.numSkips,info.getUptime(),isPlaying(), false, 0, this, oldp,newp);
1179 atom.writeParent(PCP_BCST,8);
1181 atom.writeParent(PCP_BCST,10);
1183 atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ROOT);
1184 atom.writeChar(PCP_BCST_HOPS,0);
1185 atom.writeChar(PCP_BCST_TTL,11);
1186 atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
1187 atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
1188 atom.writeInt(PCP_BCST_VERSION_VP,PCP_CLIENT_VERSION_VP);
1190 atom.writeBytes(PCP_BCST_VERSION_EX_PREFIX,PCP_CLIENT_VERSION_EX_PREFIX,2);
1191 atom.writeShort(PCP_BCST_VERSION_EX_NUMBER,PCP_CLIENT_VERSION_EX_NUMBER);
1193 atom.writeParent(PCP_CHAN,4);
1194 atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
1195 atom.writeBytes(PCP_CHAN_BCID,chanMgr->broadcastID.id,16);
1196 info.writeInfoAtoms(atom);
1197 info.writeTrackAtoms(atom);
1198 hit.writeAtoms(atom,info.id);
1202 pack.type = ChanPacket::T_PCP;
1206 int cnt = servMgr->broadcastPacket(pack,noID,servMgr->sessionID,svID,Servent::T_COUT);
1210 LOG_DEBUG("Sent tracker update for %s to %d client(s)",info.name.cstr(),cnt);
1211 lastTrackerUpdate = ctime;
1216 // -----------------------------------
1217 bool Channel::sendPacketUp(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did)
1220 && (!cid.isSet() || info.id.isSame(cid))
1221 && (!sid.isSet() || !remoteID.isSame(sid))
1224 return sourceStream->sendPacket(pack,did);
1229 // -----------------------------------
1230 void Channel::updateInfo(ChanInfo &newInfo)
1232 if (info.update(newInfo))
1234 if (isBroadcasting())
1236 unsigned int ctime = sys->getTime();
1237 if ((ctime-lastMetaUpdate) > 30)
1239 lastMetaUpdate = ctime;
1243 MemoryStream mem(pack.data,sizeof(pack));
1245 AtomStream atom(mem);
1248 atom.writeParent(PCP_BCST,8);
1250 atom.writeParent(PCP_BCST,10);
1252 atom.writeChar(PCP_BCST_HOPS,0);
1253 atom.writeChar(PCP_BCST_TTL,7);
1254 atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_RELAYS);
1255 atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
1256 atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
1257 atom.writeInt(PCP_BCST_VERSION_VP,PCP_CLIENT_VERSION_VP);
1259 atom.writeBytes(PCP_BCST_VERSION_EX_PREFIX,PCP_CLIENT_VERSION_EX_PREFIX,2);
1260 atom.writeShort(PCP_BCST_VERSION_EX_NUMBER,PCP_CLIENT_VERSION_EX_NUMBER);
1262 atom.writeBytes(PCP_BCST_CHANID,info.id.id,16);
1263 atom.writeParent(PCP_CHAN,3);
1264 atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
1265 info.writeInfoAtoms(atom);
1266 info.writeTrackAtoms(atom);
1269 pack.type = ChanPacket::T_PCP;
1272 servMgr->broadcastPacket(pack,info.id,servMgr->sessionID,noID,Servent::T_RELAY);
1274 broadcastTrackerUpdate(noID);
1278 ChanHitList *chl = chanMgr->findHitList(info);
1282 peercastApp->channelUpdate(&info);
1287 // -----------------------------------
1288 ChannelStream *Channel::createSource()
1290 // if (servMgr->relayBroadcast)
1291 // chanMgr->broadcastRelays(NULL,chanMgr->minBroadcastTTL,chanMgr->maxBroadcastTTL);
1294 ChannelStream *source=NULL;
1296 if (info.srcProtocol == ChanInfo::SP_PEERCAST)
1298 LOG_CHANNEL("Channel is Peercast");
1299 if (servMgr->allowConnectPCST) //JP-EX
1300 source = new PeercastStream();
1302 throw StreamException("Channel is not allowed");
1304 else if (info.srcProtocol == ChanInfo::SP_PCP)
1306 LOG_CHANNEL("Channel is PCP");
1307 PCPStream *pcp = new PCPStream(remoteID);
1310 else if (info.srcProtocol == ChanInfo::SP_MMS)
1312 LOG_CHANNEL("Channel is MMS");
1313 source = new MMSStream();
1316 switch(info.contentType)
1318 case ChanInfo::T_MP3:
1319 LOG_CHANNEL("Channel is MP3 - meta: %d",icyMetaInterval);
1320 source = new MP3Stream();
1322 case ChanInfo::T_NSV:
1323 LOG_CHANNEL("Channel is NSV");
1324 source = new NSVStream();
1326 case ChanInfo::T_WMA:
1327 case ChanInfo::T_WMV:
1328 throw StreamException("Channel is WMA/WMV - but not MMS");
1330 case ChanInfo::T_OGG:
1331 case ChanInfo::T_OGM:
1332 LOG_CHANNEL("Channel is OGG");
1333 source = new OGGStream();
1336 LOG_CHANNEL("Channel is Raw");
1337 source = new RawStream();
1342 source->parent = this;
1346 // ------------------------------------------
1347 void ChannelStream::updateStatus(Channel *ch)
1350 if (getStatus(ch,pack))
1352 if (!ch->isBroadcasting())
1356 int cnt = chanMgr->broadcastPacketUp(pack,ch->info.id,servMgr->sessionID,noID);
1357 LOG_CHANNEL("Sent channel status update to %d clients",cnt);
1362 // ------------------------------------------
1363 bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack)
1365 unsigned int ctime = sys->getTime();
1367 ChanHitList *chl = chanMgr->findHitListByID(ch->info.id);
1372 /* int newLocalListeners = ch->localListeners();
1373 int newLocalRelays = ch->localRelays();
1377 (numListeners != newLocalListeners)
1378 || (numRelays != newLocalRelays)
1379 || (ch->isPlaying() != isPlaying)
1380 || (servMgr->getFirewall() != fwState)
1381 || (((ctime-lastUpdate)>chanMgr->hostUpdateInterval) && chanMgr->hostUpdateInterval)
1383 && ((ctime-lastUpdate) > 10)
1387 numListeners = newLocalListeners;
1388 numRelays = newLocalRelays;
1389 isPlaying = ch->isPlaying();
1390 fwState = servMgr->getFirewall();
1395 hit.initLocal(ch->localListeners(),ch->localRelays(),ch->info.numSkips,ch->info.getUptime(),isPlaying, ch->isFull(), ch->info.bitrate, ch);
1396 hit.tracker = ch->isBroadcasting();*/
1398 int newLocalListeners = ch->localListeners();
1399 int newLocalRelays = ch->localRelays();
1401 if ((ch->isPlaying() == isPlaying)){
1402 if ((ctime-lastUpdate) < 10){
1406 if ((ctime-lastCheckTime) < 10){
1409 lastCheckTime = ctime;
1412 unsigned int oldp = ch->rawData.getOldestPos();
1413 unsigned int newp = ch->rawData.getLatestPos();
1417 // LOG_DEBUG("isPlaying-------------------------------------- %d %d", ch->isPlaying(), isPlaying);
1419 hit.initLocal(newLocalListeners,newLocalRelays,ch->info.numSkips,ch->info.getUptime(),ch->isPlaying(), ch->isFull(), ch->info.bitrate, ch, oldp, newp);
1420 hit.tracker = ch->isBroadcasting();
1422 if ( (((ctime-lastUpdate)>chanMgr->hostUpdateInterval) && chanMgr->hostUpdateInterval)
1423 || (newLocalListeners != numListeners)
1424 || (newLocalRelays != numRelays)
1425 || (ch->isPlaying() != isPlaying)
1426 || (servMgr->getFirewall() != fwState)
1427 || (ch->chDisp.relay != hit.relay)
1428 || (ch->chDisp.relayfull != hit.relayfull)
1429 || (ch->chDisp.chfull != hit.chfull)
1430 || (ch->chDisp.ratefull != hit.ratefull)
1432 numListeners = newLocalListeners;
1433 numRelays = newLocalRelays;
1434 isPlaying = ch->isPlaying();
1435 fwState = servMgr->getFirewall();
1440 if ((numRelays) && ((servMgr->getFirewall() == ServMgr::FW_OFF) && (servMgr->autoRelayKeep!=0))) //JP-EX
1441 ch->stayConnected = true;
1443 if ((!numRelays && !numListeners) && (servMgr->autoRelayKeep==2)) //JP-EX
1444 ch->stayConnected = false;
1446 MemoryStream pmem(pack.data,sizeof(pack.data));
1447 AtomStream atom(pmem);
1453 atom.writeParent(PCP_BCST,8);
1455 atom.writeParent(PCP_BCST,10);
1457 atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_TRACKERS);
1458 atom.writeChar(PCP_BCST_HOPS,0);
1459 atom.writeChar(PCP_BCST_TTL,11);
1460 atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
1461 atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
1462 atom.writeInt(PCP_BCST_VERSION_VP,PCP_CLIENT_VERSION_VP);
1464 atom.writeBytes(PCP_BCST_VERSION_EX_PREFIX,PCP_CLIENT_VERSION_EX_PREFIX,2);
1465 atom.writeShort(PCP_BCST_VERSION_EX_NUMBER,PCP_CLIENT_VERSION_EX_NUMBER);
1467 atom.writeBytes(PCP_BCST_CHANID,ch->info.id.id,16);
1468 hit.writeAtoms(atom,noID);
1470 pack.len = pmem.pos;
1471 pack.type = ChanPacket::T_PCP;
1476 // -----------------------------------
1477 bool Channel::checkBump()
1479 unsigned int maxIdleTime = 30;
1480 if (isIndexTxt(this)) maxIdleTime = 60;
1482 if (!isBroadcasting() && (!sourceHost.tracker))
1483 if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > maxIdleTime))
1485 LOG_ERROR("Channel Auto bumped");
1497 // -----------------------------------
1498 int Channel::readStream(Stream &in,ChannelStream *source)
1506 source->readHeader(in,this);
1508 peercastApp->channelStart(&info);
1510 rawData.lastWriteTime = 0;
1512 bool wasBroadcasting=false;
1514 unsigned int receiveStartTime = 0;
1516 unsigned int ptime = 0;
1517 unsigned int upsize = 0;
1521 while (thread.active && !peercastInst->isQuitting)
1525 LOG_DEBUG("Channel idle");
1531 LOG_DEBUG("Channel bumped");
1539 LOG_DEBUG("Channel eof");
1545 error = source->readPacket(in,this);
1550 //if (rawData.writePos > 0)
1551 if (rawData.lastWriteTime > 0 || rawData.lastSkipTime > 0)
1553 if (isBroadcasting())
1555 if ((sys->getTime()-lastTrackerUpdate) >= chanMgr->hostUpdateInterval)
1559 broadcastTrackerUpdate(noID);
1561 wasBroadcasting = true;
1565 /* if (status != Channel::S_RECEIVING){
1566 receiveStartTime = sys->getTime();
1567 } else if (receiveStartTime && receiveStartTime + 10 > sys->getTime()){
1568 chanMgr->hitlistlock.on();
1569 ChanHitList *hl = chanMgr->findHitList(info);
1571 hl->clearHits(true);
1573 chanMgr->hitlistlock.off();
1574 receiveStartTime = 0;
1576 setStatus(Channel::S_RECEIVING);
1579 source->updateStatus(this);
1583 unsigned int t = sys->getTime();
1586 upsize = Servent::MAX_OUTWARD_SIZE;
1589 unsigned int len = source->flushUb(in, upsize);
1594 }catch(StreamException &e)
1596 LOG_ERROR("readStream: %s",e.msg);
1600 if (!servMgr->keepDownstreams) {
1601 if (status == Channel::S_RECEIVING){
1602 chanMgr->hitlistlock.on();
1603 ChanHitList *hl = chanMgr->findHitList(info);
1605 hl->clearHits(false);
1607 chanMgr->hitlistlock.off();
1611 // setStatus(S_CLOSING);
1614 if (wasBroadcasting)
1618 broadcastTrackerUpdate(noID,true);
1621 peercastApp->channelStop(&info);
1623 source->readEnd(in,this);
1628 // -----------------------------------
1629 void PeercastStream::readHeader(Stream &in,Channel *ch)
1631 if (in.readTag() != 'PCST')
1632 throw StreamException("Not PeerCast stream");
1635 // -----------------------------------
1636 void PeercastStream::readEnd(Stream &,Channel *)
1640 // -----------------------------------
1641 int PeercastStream::readPacket(Stream &in,Channel *ch)
1647 pack.readPeercast(in);
1649 MemoryStream mem(pack.data,pack.len);
1653 case ChanPacket::T_HEAD:
1655 ch->headPack = pack;
1656 pack.pos = ch->streamPos;
1657 ch->newPacket(pack);
1658 ch->streamPos+=pack.len;
1660 case ChanPacket::T_DATA:
1661 pack.pos = ch->streamPos;
1662 ch->newPacket(pack);
1663 ch->streamPos+=pack.len;
1665 case ChanPacket::T_META:
1666 ch->insertMeta.fromMem(pack.data,pack.len);
1672 XML::Node *n = xml.findNode("channel");
1675 ChanInfo newInfo = ch->info;
1676 newInfo.updateFromXML(n);
1677 ChanHitList *chl = chanMgr->findHitList(ch->info);
1679 newInfo.updateFromXML(n);
1680 ch->updateInfo(newInfo);
1686 case ChanPacket::T_SYNC:
1688 unsigned int s = mem.readLong();
1689 if ((s-ch->syncPos) != 1)
1691 LOG_CHANNEL("Ch.%d SKIP: %d to %d (%d)",ch->index,ch->syncPos,s,ch->info.numSkips);
1694 ch->info.numSkips++;
1695 if (ch->info.numSkips>50)
1696 throw StreamException("Bumped - Too many skips");
1711 // -----------------------------------
1712 void ChannelStream::readRaw(Stream &in, Channel *ch)
1716 const int readLen = 8192;
1718 pack.init(ChanPacket::T_DATA,pack.data,readLen,ch->streamPos);
1719 in.read(pack.data,pack.len);
1720 ch->newPacket(pack);
1721 ch->checkReadDelay(pack.len);
1723 ch->streamPos+=pack.len;
1726 // ------------------------------------------
1727 void RawStream::readHeader(Stream &,Channel *)
1731 // ------------------------------------------
1732 int RawStream::readPacket(Stream &in,Channel *ch)
1738 // ------------------------------------------
1739 void RawStream::readEnd(Stream &,Channel *)
1745 // -----------------------------------
1746 void ChanPacket::init(ChanPacketv &p)
1750 if (len > MAX_DATALEN)
1751 throw StreamException("Packet data too large");
1755 priority = p.priority;
1756 memcpy(data, p.data, len);
1758 // -----------------------------------
1759 void ChanPacket::init(TYPE t, const void *p, unsigned int l,unsigned int _pos)
1762 if (l > MAX_DATALEN)
1763 throw StreamException("Packet data too large");
1770 // -----------------------------------
1771 void ChanPacket::writeRaw(Stream &out)
1773 out.write(data,len);
1775 // -----------------------------------
1776 void ChanPacket::writePeercast(Stream &out)
1778 unsigned int tp = 0;
1781 case T_HEAD: tp = 'HEAD'; break;
1782 case T_META: tp = 'META'; break;
1783 case T_DATA: tp = 'DATA'; break;
1786 if (type != T_UNKNOWN)
1789 out.writeShort(len);
1791 out.write(data,len);
1794 // -----------------------------------
1795 void ChanPacket::readPeercast(Stream &in)
1797 unsigned int tp = in.readTag();
1801 case 'HEAD': type = T_HEAD; break;
1802 case 'DATA': type = T_DATA; break;
1803 case 'META': type = T_META; break;
1804 default: type = T_UNKNOWN;
1806 len = in.readShort();
1808 if (len > MAX_DATALEN)
1809 throw StreamException("Bad ChanPacket");
1812 // -----------------------------------
1813 int ChanPacketBuffer::copyFrom(ChanPacketBuffer &buf, unsigned int reqPos)
1823 for(unsigned int i=buf.firstPos; i<=buf.lastPos; i++)
1825 //ChanPacket *src = &buf.packets[i%MAX_PACKETS];
1826 ChanPacketv *src = &buf.packets[i%MAX_PACKETS];
1827 if (src->type & accept)
1829 if (src->pos >= reqPos)
1832 //packets[writePos++] = *src;
1833 packets[writePos++].init(*src);
1842 return lastPos-firstPos;
1845 // -----------------------------------
1846 bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack)
1853 unsigned int bound = packets[0].len * ChanPacketBuffer::MAX_PACKETS * 2; // max packets to wait
1854 unsigned int fpos = getStreamPos(firstPos);
1855 unsigned int lpos = getStreamPos(lastPos);
1856 if (spos < fpos && (fpos < lpos || spos > lpos + bound))
1860 for(unsigned int i=firstPos; i<=lastPos; i++)
1862 //ChanPacket &p = packets[i%MAX_PACKETS];
1863 ChanPacketv &p = packets[i%MAX_PACKETS];
1864 if (p.pos >= spos && p.pos - spos <= bound)
1875 // -----------------------------------
1876 unsigned int ChanPacketBuffer::getLatestPos()
1881 return getStreamPos(lastPos);
1884 // -----------------------------------
1885 unsigned int ChanPacketBuffer::getOldestPos()
1890 return getStreamPos(firstPos);
1893 // -----------------------------------
1894 unsigned int ChanPacketBuffer::findOldestPos(unsigned int spos)
1896 unsigned int min = getStreamPos(safePos);
1897 unsigned int max = getStreamPos(lastPos);
1908 // -----------------------------------
1909 unsigned int ChanPacketBuffer::getStreamPos(unsigned int index)
1911 return packets[index%MAX_PACKETS].pos;
1913 // -----------------------------------
1914 unsigned int ChanPacketBuffer::getStreamPosEnd(unsigned int index)
1916 return packets[index%MAX_PACKETS].pos+packets[index%MAX_PACKETS].len;
1918 // -----------------------------------
1919 bool ChanPacketBuffer::writePacket(ChanPacket &pack, bool updateReadPos)
1923 if (servMgr->keepDownstreams) {
1924 unsigned int lpos = getLatestPos();
1925 unsigned int diff = pack.pos - lpos;
1926 if (packets[lastPos%MAX_PACKETS].type == ChanPacket::T_HEAD) lpos = 0;
1927 if (lpos && (diff == 0 || diff > 0xfff00000)) {
1928 LOG_DEBUG("* latest pos=%d, pack pos=%d", getLatestPos(), pack.pos);
1929 lastSkipTime = sys->getTime();
1934 if (willSkip()) // too far behind
1936 lastSkipTime = sys->getTime();
1942 pack.sync = writePos;
1943 packets[writePos%MAX_PACKETS].init(pack);
1945 // LOG_DEBUG("packet.len = %d",pack.len);
1951 if (writePos >= MAX_PACKETS)
1952 firstPos = writePos-MAX_PACKETS;
1956 if (writePos >= NUM_SAFEPACKETS)
1957 safePos = writePos - NUM_SAFEPACKETS;
1964 lastWriteTime = sys->getTime();
1972 // -----------------------------------
1973 void ChanPacketBuffer::readPacket(ChanPacket &pack)
1976 unsigned int tim = sys->getTime();
1978 if (readPos < firstPos)
1979 throw StreamException("Read too far behind");
1981 while (readPos >= writePos)
1984 if ((sys->getTime() - tim) > 30)
1985 throw TimeoutException();
1988 pack.init(packets[readPos%MAX_PACKETS]);
1993 // -----------------------------------
1994 void ChanPacketBuffer::readPacketPri(ChanPacket &pack)
1996 unsigned int tim = sys->getTime();
1998 if (readPos < firstPos)
1999 throw StreamException("Read too far behind");
2001 while (readPos >= writePos)
2004 if ((sys->getTime() - tim) > 30)
2005 throw TimeoutException();
2008 ChanPacketv *best = &packets[readPos % MAX_PACKETS];
2009 for (unsigned int i = readPos + 1; i < writePos; i++) {
2010 if (packets[i % MAX_PACKETS].priority > best->priority)
2011 best = &packets[i % MAX_PACKETS];
2014 best->init(packets[readPos % MAX_PACKETS]);
2019 // -----------------------------------
2020 bool ChanPacketBuffer::willSkip()
2022 return ((writePos-readPos) >= MAX_PACKETS);
2025 // -----------------------------------
2026 void Channel::getStreamPath(char *str)
2032 sprintf(str,"/stream/%s%s",idStr,info.getTypeExt(info.contentType));
2037 // -----------------------------------
2038 void ChanMgr::startSearch(ChanInfo &info)
2045 searchActive = true;
2047 // -----------------------------------
2048 void ChanMgr::quit()
2050 LOG_DEBUG("ChanMgr is quitting..");
2053 // -----------------------------------
2054 int ChanMgr::numIdleChannels()
2057 Channel *ch = channel;
2061 if (ch->thread.active)
2062 if (ch->status == Channel::S_IDLE)
2068 // -----------------------------------
2069 void ChanMgr::closeOldestIdle()
2071 unsigned int idleTime = (unsigned int)-1;
2072 Channel *ch = channel,*oldest=NULL;
2076 if (ch->thread.active)
2077 if (ch->status == Channel::S_IDLE)
2078 if (ch->lastIdleTime < idleTime)
2081 idleTime = ch->lastIdleTime;
2087 oldest->thread.active = false;
2090 // -----------------------------------
2091 void ChanMgr::closeAll()
2093 Channel *ch = channel;
2096 if (ch->thread.active)
2097 ch->thread.shutdown();
2101 // -----------------------------------
2102 Channel *ChanMgr::findChannelByNameID(ChanInfo &info)
2104 Channel *ch = channel;
2108 if (ch->info.matchNameID(info))
2115 // -----------------------------------
2116 Channel *ChanMgr::findChannelByName(const char *n)
2118 Channel *ch = channel;
2122 if (stricmp(ch->info.name,n)==0)
2129 // -----------------------------------
2130 Channel *ChanMgr::findChannelByIndex(int index)
2133 Channel *ch = channel;
2146 // -----------------------------------
2147 Channel *ChanMgr::findChannelByMount(const char *str)
2149 Channel *ch = channel;
2153 if (strcmp(ch->mount,str)==0)
2160 // -----------------------------------
2161 Channel *ChanMgr::findChannelByID(GnuID &id)
2163 Channel *ch = channel;
2167 if (ch->info.id.isSame(id))
2173 // -----------------------------------
2174 Channel *ChanMgr::findChannelByChannelID(int id)
2177 Channel *ch = channel;
2180 if (ch->isActive()){
2181 if (ch->channel_id == id){
2189 // -----------------------------------
2190 int ChanMgr::findChannels(ChanInfo &info, Channel **chlist, int max)
2193 Channel *ch = channel;
2197 if (ch->info.match(info))
2207 // -----------------------------------
2208 int ChanMgr::findChannelsByStatus(Channel **chlist, int max, Channel::STATUS status)
2211 Channel *ch = channel;
2215 if (ch->status == status)
2225 // -----------------------------------
2226 Channel *ChanMgr::createRelay(ChanInfo &info, bool stayConnected)
2228 Channel *c = chanMgr->createChannel(info,NULL);
2231 c->stayConnected = stayConnected;
2237 // -----------------------------------
2238 Channel *ChanMgr::findAndRelay(ChanInfo &info)
2241 info.id.toStr(idStr);
2242 LOG_CHANNEL("Searching for: %s (%s)",idStr,info.name.cstr());
2244 if(!isIndexTxt(&info)) // for PCRaw (popup)
2245 peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Finding channel...");
2250 WLockBlock wb(&(chanMgr->channellock));
2254 c = findChannelByNameID(info);
2258 c = chanMgr->createChannel(info,NULL);
2261 c->setStatus(Channel::S_SEARCHING);
2264 } else if (!(c->thread.active)){
2265 c->thread.active = true;
2266 c->thread.finish = false;
2267 c->info.lastPlayStart = 0; // reconnect
2268 c->info.lastPlayEnd = 0;
2270 c->finthread->finish = true;
2271 c->finthread = NULL;
2273 if (c->status != Channel::S_CONNECTING && c->status != Channel::S_SEARCHING){
2274 c->setStatus(Channel::S_SEARCHING);
2281 for(int i=0; i<600; i++) // search for 1 minute.
2286 c = findChannelByNameID(info);
2290 // peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Channel not found");
2295 if (c->isPlaying() && (c->info.contentType!=ChanInfo::T_UNKNOWN))
2305 // -----------------------------------
2312 currFindAndPlayChannel.clear();
2314 broadcastMsg.clear();
2315 broadcastMsgInterval=10;
2317 broadcastID.generate(PCP_BROADCAST_FLAGS);
2322 icyMetaInterval = 8192;
2323 maxRelaysPerChannel = 1;
2327 minBroadcastTTL = 1;
2328 maxBroadcastTTL = 7;
2330 pushTimeout = 60; // 1 minute
2331 pushTries = 5; // 5 times
2332 maxPushHops = 8; // max 8 hops away
2333 maxUptime = 0; // 0 = none
2335 prefetchTime = 10; // n seconds
2337 hostUpdateInterval = 120; // 2 minutes
2349 // -----------------------------------
2350 bool ChanMgr::writeVariable(Stream &out, const String &var, int index)
2353 if (var == "numHitLists")
2354 sprintf(buf,"%d",numHitLists());
2356 else if (var == "numChannels")
2357 sprintf(buf,"%d",numChannels());
2358 else if (var == "djMessage")
2359 strcpy(buf,broadcastMsg.cstr());
2360 else if (var == "icyMetaInterval")
2361 sprintf(buf,"%d",icyMetaInterval);
2362 else if (var == "maxRelaysPerChannel")
2363 sprintf(buf,"%d",maxRelaysPerChannel);
2364 else if (var == "hostUpdateInterval")
2365 sprintf(buf,"%d",hostUpdateInterval);
2366 else if (var == "broadcastID")
2367 broadcastID.toStr(buf);
2373 out.writeString(buf);
2377 // -----------------------------------
2378 bool Channel::writeVariable(Stream &out, const String &var, int index)
2389 utf8.convertTo(String::T_UNICODESAFE);
2390 strcpy(buf,utf8.cstr());
2392 }else if (var == "bitrate")
2394 sprintf(buf,"%d",info.bitrate);
2396 }else if (var == "srcrate")
2400 unsigned int tot = sourceData->getSourceRate();
2401 sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
2405 }else if (var == "genre")
2408 utf8.convertTo(String::T_UNICODESAFE);
2409 strcpy(buf,utf8.cstr());
2410 }else if (var == "desc")
2413 utf8.convertTo(String::T_UNICODESAFE);
2414 strcpy(buf,utf8.cstr());
2415 }else if (var == "comment")
2417 utf8 = info.comment;
2418 utf8.convertTo(String::T_UNICODESAFE);
2419 strcpy(buf,utf8.cstr());
2420 }else if (var == "uptime")
2423 if (info.lastPlayStart)
2424 uptime.setFromStopwatch(sys->getTime()-info.lastPlayStart);
2427 strcpy(buf,uptime.cstr());
2429 else if (var == "type")
2430 sprintf(buf,"%s",ChanInfo::getTypeStr(info.contentType));
2431 else if (var == "ext")
2432 sprintf(buf,"%s",ChanInfo::getTypeExt(info.contentType));
2433 else if (var == "proto") {
2434 switch(info.contentType) {
2435 case ChanInfo::T_WMA:
2436 case ChanInfo::T_WMV:
2437 sprintf(buf, "mms://");
2440 sprintf(buf, "http://");
2443 else if (var == "localRelays")
2444 sprintf(buf,"%d",localRelays());
2445 else if (var == "localListeners")
2446 sprintf(buf,"%d",localListeners());
2448 else if (var == "totalRelays")
2449 sprintf(buf,"%d",totalRelays());
2450 else if (var == "totalListeners")
2451 sprintf(buf,"%d",totalListeners());
2453 else if (var == "status")
2454 sprintf(buf,"%s",getStatusStr());
2455 else if (var == "keep")
2456 sprintf(buf,"%s",stayConnected?"Yes":"No");
2457 else if (var == "id")
2459 else if (var.startsWith("track."))
2462 if (var == "track.title")
2463 utf8 = info.track.title;
2464 else if (var == "track.artist")
2465 utf8 = info.track.artist;
2466 else if (var == "track.album")
2467 utf8 = info.track.album;
2468 else if (var == "track.genre")
2469 utf8 = info.track.genre;
2470 else if (var == "track.contactURL")
2471 utf8 = info.track.contact;
2473 utf8.convertTo(String::T_UNICODESAFE);
2474 strcpy(buf,utf8.cstr());
2476 }else if (var == "contactURL")
2477 sprintf(buf,"%s",info.url.cstr());
2478 else if (var == "streamPos")
2479 sprintf(buf,"%d",streamPos);
2480 else if (var == "sourceType")
2481 strcpy(buf,getSrcTypeStr());
2482 else if (var == "sourceProtocol")
2483 strcpy(buf,ChanInfo::getProtocolStr(info.srcProtocol));
2484 else if (var == "sourceURL")
2486 if (sourceURL.isEmpty())
2487 sourceHost.host.toStr(buf);
2489 strcpy(buf,sourceURL.cstr());
2491 else if (var == "headPos")
2492 sprintf(buf,"%d",headPack.pos);
2493 else if (var == "headLen")
2494 sprintf(buf,"%d",headPack.len);
2495 else if (var == "numHits")
2497 ChanHitList *chl = chanMgr->findHitListByID(info.id);
2500 // numHits = chl->numHits();
2508 sprintf(buf,"%d",numHits);
2509 } else if (var == "isBroadcast")
2510 strcpy(buf, (type == T_BROADCAST) ? "1":"0");
2514 out.writeString(buf);
2518 // -----------------------------------
2519 void ChanMgr::broadcastTrackerUpdate(GnuID &svID, bool force)
2521 Channel *c = channel;
2524 if ( c->isActive() && c->isBroadcasting() )
2525 c->broadcastTrackerUpdate(svID,force);
2531 // -----------------------------------
2532 int ChanMgr::broadcastPacketUp(ChanPacket &pack,GnuID &chanID, GnuID &srcID, GnuID &destID)
2536 Channel *c = channel;
2539 if (c->sendPacketUp(pack,chanID,srcID,destID))
2547 // -----------------------------------
2548 void ChanMgr::broadcastRelays(Servent *serv, int minTTL, int maxTTL)
2550 //if ((servMgr->getFirewall() == ServMgr::FW_OFF) || servMgr->serverHost.localIP())
2553 Host sh = servMgr->serverHost;
2554 bool push = (servMgr->getFirewall()!=ServMgr::FW_OFF);
2555 bool busy = (servMgr->pubInFull() && servMgr->outFull()) || servMgr->relaysFull();
2556 bool stable = servMgr->totalStreams>0;
2563 Channel *c = channel;
2569 bool tracker = c->isBroadcasting();
2571 int ttl = (c->info.getUptime() / servMgr->relayBroadcast); // 1 hop per N seconds
2579 if (hit.initHit(sh,c,NULL,push,busy,stable,tracker,ttl))
2585 serv->outputPacket(hit,false);
2589 LOG_NETWORK("Sent channel to %d servents, TTL %d",numOut,ttl);
2596 // LOG_NETWORK("Sent %d channels to %d servents",numChans,numOut);
2599 // -----------------------------------
2600 void ChanMgr::setUpdateInterval(unsigned int v)
2602 hostUpdateInterval = v;
2606 // -----------------------------------
2610 MemoryStream mem(pack.data,sizeof(pack.data));
2611 AtomStream atom(mem);
2612 atom.writeParent(PCP_BCST,3);
2613 atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ALL);
2614 atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
2615 atom.writeParent(PCP_MESG,1);
2616 atom.writeString(PCP_MESG_DATA,msg.cstr());
2626 PCPStream::readAtom(atom,bcs);
2627 //int cnt = servMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
2628 //int cnt = servMgr->broadcastPacketDown(pack,noID,servMgr->sessionID);
2629 //int cnt = chanMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
2630 //LOG_DEBUG("Sent message to %d clients",cnt);
2632 // -----------------------------------
2633 void ChanMgr::setBroadcastMsg(String &msg)
2635 if (!msg.isSame(broadcastMsg))
2639 Channel *c = channel;
2642 if (c->isActive() && c->isBroadcasting())
2644 ChanInfo newInfo = c->info;
2645 newInfo.comment = broadcastMsg;
2646 c->updateInfo(newInfo);
2655 // -----------------------------------
2656 void ChanMgr::clearHitLists()
2659 // LOG_DEBUG("clearHitLists HITLISTLOCK ON-------------");
2660 chanMgr->hitlistlock.on();
2663 peercastApp->delChannel(&hitlist->info);
2665 ChanHitList *next = hitlist->next;
2671 // LOG_DEBUG("clearHitLists HITLISTLOCK OFF-------------");
2672 chanMgr->hitlistlock.off();
2674 // -----------------------------------
2675 Channel *ChanMgr::deleteChannel(Channel *delchan)
2679 Channel *ch = channel,*prev=NULL,*next=NULL;
2685 Channel *next = ch->next;
2691 if (delchan->sourceStream){
2692 delchan->sourceStream->parent = NULL;
2708 // -----------------------------------
2709 Channel *ChanMgr::createChannel(ChanInfo &info, const char *mount)
2722 nc->info.lastPlayStart = 0;
2723 nc->info.lastPlayEnd = 0;
2724 nc->info.status = ChanInfo::S_UNKNOWN;
2726 nc->mount.set(mount);
2727 nc->setStatus(Channel::S_WAIT);
2728 nc->type = Channel::T_ALLOCATED;
2729 nc->info.createdTime = sys->getTime();
2731 LOG_CHANNEL("New channel created");
2736 // -----------------------------------
2737 int ChanMgr::pickHits(ChanHitSearch &chs)
2739 ChanHitList *chl = hitlist;
2743 if (chl->pickHits(chs))
2753 // -----------------------------------
2754 ChanHitList *ChanMgr::findHitList(ChanInfo &info)
2756 ChanHitList *chl = hitlist;
2760 if (chl->info.matchNameID(info))
2767 // -----------------------------------
2768 ChanHitList *ChanMgr::findHitListByID(GnuID &id)
2770 ChanHitList *chl = hitlist;
2774 if (chl->info.id.isSame(id))
2780 // -----------------------------------
2781 int ChanMgr::numHitLists()
2784 ChanHitList *chl = hitlist;
2793 // -----------------------------------
2794 ChanHitList *ChanMgr::addHitList(ChanInfo &info)
2796 ChanHitList *chl = new ChanHitList();
2799 chl->next = hitlist;
2804 chl->info.createdTime = sys->getTime();
2805 peercastApp->addChannel(&chl->info);
2810 // -----------------------------------
2811 void ChanMgr::clearDeadHits(bool clearTrackers)
2813 unsigned int interval;
2815 if (servMgr->isRoot)
2816 interval = 1200; // mainly for old 0.119 clients
2818 interval = hostUpdateInterval+120;
2820 chanMgr->hitlistlock.on();
2821 ChanHitList *chl = hitlist,*prev = NULL;
2826 if (chl->clearDeadHits(interval,clearTrackers) == 0)
2828 if (!isBroadcasting(chl->info.id))
2830 if (!chanMgr->findChannelByID(chl->info.id))
2832 peercastApp->delChannel(&chl->info);
2834 ChanHitList *next = chl->next;
2850 chanMgr->hitlistlock.off();
2852 // -----------------------------------
2853 bool ChanMgr::isBroadcasting(GnuID &id)
2855 Channel *ch = findChannelByID(id);
2857 return ch->isBroadcasting();
2861 // -----------------------------------
2862 bool ChanMgr::isBroadcasting()
2864 Channel *ch = channel;
2868 if (ch->isBroadcasting())
2876 // -----------------------------------
2877 int ChanMgr::numChannels()
2880 Channel *ch = channel;
2890 // -----------------------------------
2891 void ChanMgr::deadHit(ChanHit &hit)
2893 ChanHitList *chl = findHitListByID(hit.chanID);
2897 // -----------------------------------
2898 void ChanMgr::delHit(ChanHit &hit)
2900 ChanHitList *chl = findHitListByID(hit.chanID);
2905 // -----------------------------------
2906 void ChanMgr::addHit(Host &h,GnuID &id,bool tracker)
2912 hit.rhost[1].init();
2913 hit.tracker = tracker;
2918 // -----------------------------------
2919 ChanHit *ChanMgr::addHit(ChanHit &h)
2922 lastHit = sys->getTime();
2924 ChanHitList *hl=NULL;
2926 hl = findHitListByID(h.chanID);
2932 hl = addHitList(info);
2937 return hl->addHit(h);
2942 // -----------------------------------
2943 bool ChanMgr::findParentHit(ChanHit &p)
2945 ChanHitList *hl=NULL;
2947 chanMgr->hitlistlock.on();
2949 hl = findHitListByID(p.chanID);
2953 ChanHit *ch = hl->hit;
2956 if (!ch->dead && (ch->rhost[0].ip == p.uphost.ip)
2957 && (ch->rhost[0].port == p.uphost.port))
2959 chanMgr->hitlistlock.off();
2966 chanMgr->hitlistlock.off();
2971 // -----------------------------------
2972 class ChanFindInfo : public ThreadInfo
2978 // -----------------------------------
2979 THREAD_PROC findAndPlayChannelProc(ThreadInfo *th)
2981 ChanFindInfo *cfi = (ChanFindInfo *)th;
2987 Channel *ch = chanMgr->findChannelByNameID(info);
2989 chanMgr->currFindAndPlayChannel = info.id;
2992 ch = chanMgr->findAndRelay(info);
2996 // check that a different channel hasn`t be selected already.
2997 if (chanMgr->currFindAndPlayChannel.isSame(ch->info.id))
2998 chanMgr->playChannel(ch->info);
3001 ch->stayConnected = cfi->keep;
3007 // -----------------------------------
3008 void ChanMgr::findAndPlayChannel(ChanInfo &info, bool keep)
3010 ChanFindInfo *cfi = new ChanFindInfo;
3013 cfi->func = findAndPlayChannelProc;
3016 sys->startThread(cfi);
3018 // -----------------------------------
3019 void ChanMgr::playChannel(ChanInfo &info)
3022 char str[128],fname[256],idStr[128];
3024 sprintf(str,"http://localhost:%d",servMgr->serverHost.port);
3025 info.id.toStr(idStr);
3027 PlayList::TYPE type;
3030 if ((info.contentType == ChanInfo::T_WMA) || (info.contentType == ChanInfo::T_WMV))
3032 type = PlayList::T_ASX;
3033 // WMP seems to have a bug where it doesn`t re-read asx files if they have the same name
3034 // so we prepend the channel id to make it unique - NOTE: should be deleted afterwards.
3035 if (servMgr->getModulePath) //JP-EX
3037 peercastApp->getDirectory();
3038 sprintf(fname,"%s/%s.asx",servMgr->modulePath,idStr);
3040 sprintf(fname,"%s/%s.asx",peercastApp->getPath(),idStr);
3041 }else if (info.contentType == ChanInfo::T_OGM)
3043 type = PlayList::T_RAM;
3044 if (servMgr->getModulePath) //JP-EX
3046 peercastApp->getDirectory();
3047 sprintf(fname,"%s/play.ram",servMgr->modulePath);
3049 sprintf(fname,"%s/play.ram",peercastApp->getPath());
3053 type = PlayList::T_SCPLS;
3054 if (servMgr->getModulePath) //JP-EX
3056 peercastApp->getDirectory();
3057 sprintf(fname,"%s/play.pls",servMgr->modulePath);
3059 sprintf(fname,"%s/play.pls",peercastApp->getPath());
3063 PlayList *pls = new PlayList(type,1);
3064 pls->addChannel(str,info);
3067 LOG_DEBUG("Writing %s",fname);
3069 file.openWriteReplace(fname);
3074 LOG_DEBUG("Executing: %s",fname);
3075 sys->executeFile(fname);
3080 // -----------------------------------
3081 ChanHitList::ChanHitList()
3088 // -----------------------------------
3089 ChanHitList::~ChanHitList()
3091 chanMgr->hitlistlock.on();
3093 hit = deleteHit(hit);
3094 chanMgr->hitlistlock.off();
3096 // -----------------------------------
3097 void ChanHit::pickNearestIP(Host &h)
3099 for(int i=0; i<2; i++)
3101 if (h.classType() == rhost[i].classType())
3109 // -----------------------------------
3110 void ChanHit::init()
3122 dead = tracker = firewalled = stable = yp = false;
3123 recv = cin = direct = relay = true;
3124 relayfull = chfull = ratefull = false;
3133 version_ex_prefix[0] = ' ';
3134 version_ex_prefix[1] = ' ';
3135 version_ex_number = 0;
3144 oldestPos = newestPos = 0;
3149 // -----------------------------------
3150 void ChanHit::initLocal(int numl,int numr,int,int uptm,bool connected,bool isFull,unsigned int bitrate, Channel* ch, unsigned int oldp,unsigned int newp)
3153 firewalled = (servMgr->getFirewall() != ServMgr::FW_OFF);
3154 numListeners = numl;
3157 stable = servMgr->totalStreams>0;
3158 sessionID = servMgr->sessionID;
3161 direct = !servMgr->directFull();
3162 // relay = !servMgr->relaysFull();
3163 cin = !servMgr->controlInFull();
3165 relayfull = servMgr->relaysFull();
3168 Channel *c = chanMgr->channel;
3170 unsigned int needRate = 0;
3171 unsigned int allRate = 0;
3173 if (c->isPlaying()){
3174 allRate += c->info.bitrate * c->localRelays();
3175 if ((c != ch) && (c->localRelays() == 0)){
3176 if(!isIndexTxt(c)) // for PCRaw (relay)
3178 needRate+=c->info.bitrate;
3183 unsigned int numRelay = servMgr->numStreams(Servent::T_RELAY,false);
3184 int diff = servMgr->maxRelays - numRelay;
3185 if (ch->localRelays()){
3186 if (noRelay > diff){
3194 // ratefull = servMgr->bitrateFull(needRate+bitrate);
3195 ratefull = (servMgr->maxBitrateOut < allRate + needRate + ch->info.bitrate);
3197 if (!isIndexTxt(ch))
3198 relay = (!relayfull) && (!chfull) && (!ratefull) && (numRelay + noRelay < servMgr->maxRelays);
3200 relay = (!chfull) && (!ratefull); // for PCRaw (relay)
3203 LOG_DEBUG("Reject by relay full");
3206 LOG_DEBUG("Reject by channel full");
3209 LOG_DEBUG("Reject by rate: Max=%d Now=%d Need=%d ch=%d", servMgr->maxBitrateOut, allRate, needRate, ch->info.bitrate);
3212 host = servMgr->serverHost;
3214 version = PCP_CLIENT_VERSION;
3215 version_vp = PCP_CLIENT_VERSION_VP;
3217 strncpy(version_ex_prefix, PCP_CLIENT_VERSION_EX_PREFIX,2);
3218 version_ex_number = PCP_CLIENT_VERSION_EX_NUMBER;
3220 version_ex_prefix[0] = ' ';
3221 version_ex_prefix[1] = ' ';
3222 version_ex_number = 0;
3225 status = ch->status;
3227 rhost[0] = Host(host.ip,host.port);
3228 rhost[1] = Host(ClientSocket::getIP(NULL),host.port);
3236 uphost.ip = ch->sourceHost.host.ip;
3237 uphost.port = ch->sourceHost.host.port;
3241 // -----------------------------------
3242 void ChanHit::writeAtoms(AtomStream &atom,GnuID &chanID)
3244 bool addChan=chanID.isSet();
3245 bool uphostdata=(uphost.ip != 0);
3248 if (recv) fl1 |= PCP_HOST_FLAGS1_RECV;
3249 if (relay) fl1 |= PCP_HOST_FLAGS1_RELAY;
3250 if (direct) fl1 |= PCP_HOST_FLAGS1_DIRECT;
3251 if (cin) fl1 |= PCP_HOST_FLAGS1_CIN;
3252 if (tracker) fl1 |= PCP_HOST_FLAGS1_TRACKER;
3253 if (firewalled) fl1 |= PCP_HOST_FLAGS1_PUSH;
3255 atom.writeParent(PCP_HOST,13 + (addChan?1:0) + (uphostdata?3:0) + (version_ex_number?2:0));
3258 atom.writeBytes(PCP_HOST_CHANID,chanID.id,16);
3259 atom.writeBytes(PCP_HOST_ID,sessionID.id,16);
3260 atom.writeInt(PCP_HOST_IP,rhost[0].ip);
3261 atom.writeShort(PCP_HOST_PORT,rhost[0].port);
3262 atom.writeInt(PCP_HOST_IP,rhost[1].ip);
3263 atom.writeShort(PCP_HOST_PORT,rhost[1].port);
3264 atom.writeInt(PCP_HOST_NUML,numListeners);
3265 atom.writeInt(PCP_HOST_NUMR,numRelays);
3266 atom.writeInt(PCP_HOST_UPTIME,upTime);
3267 atom.writeInt(PCP_HOST_VERSION,version);
3268 atom.writeInt(PCP_HOST_VERSION_VP,version_vp);
3269 if (version_ex_number){
3270 atom.writeBytes(PCP_HOST_VERSION_EX_PREFIX,version_ex_prefix,2);
3271 atom.writeShort(PCP_HOST_VERSION_EX_NUMBER,version_ex_number);
3273 atom.writeChar(PCP_HOST_FLAGS1,fl1);
3274 atom.writeInt(PCP_HOST_OLDPOS,oldestPos);
3275 atom.writeInt(PCP_HOST_NEWPOS,newestPos);
3277 atom.writeInt(PCP_HOST_UPHOST_IP,uphost.ip);
3278 atom.writeInt(PCP_HOST_UPHOST_PORT,uphost.port);
3279 atom.writeInt(PCP_HOST_UPHOST_HOPS,uphostHops);
3282 // -----------------------------------
3283 bool ChanHit::writeVariable(Stream &out, const String &var)
3287 if (var == "rhost0")
3289 if (servMgr->enableGetName) //JP-EX s
3295 strcpy(buf,"<font color=red>");
3297 strcpy(buf,"<font color=orange>");
3302 strcpy(buf,"<font color=purple>");
3304 strcpy(buf,"<font color=blue>");
3307 strcpy(buf,"<font color=green>");
3311 rhost[0].toStr(buf2);
3315 if (ClientSocket::getHostname(h_name,rhost[0].ip))
3321 strcat(buf,"</font>");
3324 rhost[0].toStr(buf);
3326 else if (var == "rhost1")
3327 rhost[1].toStr(buf);
3328 else if (var == "numHops")
3329 sprintf(buf,"%d",numHops);
3330 else if (var == "numListeners")
3331 sprintf(buf,"%d",numListeners);
3332 else if (var == "numRelays")
3333 sprintf(buf,"%d",numRelays);
3334 else if (var == "uptime")
3337 timeStr.setFromStopwatch(upTime);
3338 strcpy(buf,timeStr.cstr());
3339 }else if (var == "update")
3343 timeStr.setFromStopwatch(sys->getTime()-time);
3346 strcpy(buf,timeStr.cstr());
3347 }else if (var == "isFirewalled"){
3348 sprintf(buf,"%d",firewalled?1:0);
3349 }else if (var == "version"){
3350 sprintf(buf,"%d",version);
3351 }else if (var == "agent"){
3353 if (version_ex_number){
3354 sprintf(buf, "v0.%d(%c%c%04d)", version, version_ex_prefix[0], version_ex_prefix[1], version_ex_number);
3355 } else if (version_vp){
3356 sprintf(buf,"v0.%d(VP%04d)", version, version_vp);
3358 sprintf(buf,"v0.%d", version);
3364 else if (var == "check")
3367 strcpy(buf, "<a href=\"#\" onclick=\"checkip('");
3368 rhost[0].IPtoStr(buf2);
3370 strcat(buf, "')\">_</a>");
3372 else if (var == "uphost") // tree
3374 else if (var == "uphostHops") // tree
3375 sprintf(buf,"%d",uphostHops);
3376 else if (var == "canRelay") // tree
3377 sprintf(buf, "%d",relay);
3381 out.writeString(buf);
3385 // -----------------------------------
3386 int ChanHitList::getTotalListeners()
3393 cnt+=h->numListeners;
3398 // -----------------------------------
3399 int ChanHitList::getTotalRelays()
3411 // -----------------------------------
3412 int ChanHitList::getTotalFirewalled()
3426 // -----------------------------------
3427 int ChanHitList::contactTrackers(bool connected, int numl, int nums, int uptm)
3432 void ChanHitList::clearHits(bool flg)
3434 ChanHit *c = hit, *prev = NULL;
3437 if (flg || (c->numHops != 0)){
3438 ChanHit *next = c->next;
3453 // -----------------------------------
3454 ChanHit *ChanHitList::deleteHit(ChanHit *ch)
3456 ChanHit *c = hit,*prev=NULL;
3461 ChanHit *next = c->next;
3477 // -----------------------------------
3478 ChanHit *ChanHitList::addHit(ChanHit &h)
3480 char ip0str[64],ip1str[64];
3481 h.rhost[0].toStr(ip0str);
3482 h.rhost[1].toStr(ip1str);
3484 h.uphost.toStr(uphostStr);
3486 LOG_DEBUG("Add hit: F%dT%dR%d %s/%s <- %s(%d)",h.firewalled,h.tracker,h.relay,ip0str,ip1str,uphostStr, h.uphostHops);
3488 LOG_DEBUG("Add hit: F%dT%dR%d %s/%s",h.firewalled,h.tracker,h.relay,ip0str,ip1str);
3491 // dont add our own hits
3492 if (servMgr->sessionID.isSame(h.sessionID))
3496 lastHitTime = sys->getTime();
3497 h.time = lastHitTime;
3502 if ((ch->rhost[0].ip == h.rhost[0].ip) && (ch->rhost[0].port == h.rhost[0].port))
3503 if (((ch->rhost[1].ip == h.rhost[1].ip) && (ch->rhost[1].port == h.rhost[1].port)) || (!ch->rhost[1].isValid()))
3507 if (ch->numHops > 0 && h.numHops == 0)
3508 // downstream hit recieved as RelayHost
3510 ChanHit *next = ch->next;
3519 // clear hits with same session ID (IP may have changed)
3520 if (h.sessionID.isSet())
3526 if (ch->sessionID.isSame(h.sessionID))
3538 ChanHit *ch = new ChanHit();
3540 ch->chanID = info.id;
3550 // -----------------------------------
3551 int ChanHitList::clearDeadHits(unsigned int timeout, bool clearTrackers)
3554 unsigned int ctime = sys->getTime();
3556 // LOG_DEBUG("clearDeadHits HITLISTLOCK ON-------------");
3557 chanMgr->hitlistlock.on();
3563 if (ch->dead || ((ctime-ch->time) > timeout) && (clearTrackers || (!clearTrackers & !ch->tracker)))
3565 // ch = deleteHit(ch);
3567 if (ch->firewalled){
3568 // LOG_DEBUG("kickKeepTime = %d, %d", servMgr->kickKeepTime, ctime-ch->time);
3569 if ( (servMgr->kickKeepTime == 0) || ((ctime-ch->time) > servMgr->kickKeepTime) ){
3573 ch->numListeners = 0;
3586 // LOG_DEBUG("clearDeadHits HITLISTLOCK OFF-------------");
3587 chanMgr->hitlistlock.off();
3592 // -----------------------------------
3593 void ChanHitList::deadHit(ChanHit &h)
3595 char ip0str[64],ip1str[64];
3596 h.rhost[0].toStr(ip0str);
3597 h.rhost[1].toStr(ip1str);
3598 LOG_DEBUG("Dead hit: %s/%s",ip0str,ip1str);
3604 if (ch->rhost[0].isSame(h.rhost[0]) && ch->rhost[1].isSame(h.rhost[1]))
3611 // -----------------------------------
3612 void ChanHitList::delHit(ChanHit &h)
3614 char ip0str[64],ip1str[64];
3615 h.rhost[0].toStr(ip0str);
3616 h.rhost[1].toStr(ip1str);
3617 LOG_DEBUG("Del hit: %s/%s",ip0str,ip1str);
3623 if (ch->rhost[0].isSame(h.rhost[0]) && ch->rhost[1].isSame(h.rhost[1]))
3631 // -----------------------------------
3632 int ChanHitList::numHits()
3638 if (ch->host.ip && !ch->dead && ch->numHops)
3645 // -----------------------------------
3646 int ChanHitList::numListeners()
3652 if (ch->host.ip && !ch->dead && ch->numHops)
3653 cnt += ch->numListeners;
3659 // -----------------------------------
3660 int ChanHitList::numRelays()
3666 if (ch->host.ip && !ch->dead)
3667 cnt += ch->numRelays;
3674 // -----------------------------------
3675 int ChanHitList::numTrackers()
3681 if ((ch->host.ip && !ch->dead) && (ch->tracker))
3687 // -----------------------------------
3688 int ChanHitList::numFirewalled()
3694 if (ch->host.ip && !ch->dead)
3695 cnt += ch->firewalled?1:0;
3700 // -----------------------------------
3701 int ChanHitList::closestHit()
3703 unsigned int hop=10000;
3707 if (ch->host.ip && !ch->dead)
3708 if (ch->numHops < hop)
3715 // -----------------------------------
3716 int ChanHitList::furthestHit()
3722 if (ch->host.ip && !ch->dead)
3723 if (ch->numHops > hop)
3730 // -----------------------------------
3731 unsigned int ChanHitList::newestHit()
3733 unsigned int time=0;
3737 if (ch->host.ip && !ch->dead)
3738 if (ch->time > time)
3745 // -----------------------------------
3746 int ChanHitList::pickHits(ChanHitSearch &chs)
3748 ChanHit best,*bestP=NULL;
3753 unsigned int ctime = sys->getTime();
3758 if (c->host.ip && !c->dead)
3760 if (!chs.excludeID.isSame(c->sessionID))
3761 if ((chs.waitDelay==0) || ((ctime-c->lastContact) >= chs.waitDelay))
3762 if ((c->numHops<=best.numHops)) // (c->time>=best.time))
3763 if (c->relay || (!c->relay && chs.useBusyRelays))
3764 if (c->cin || (!c->cin && chs.useBusyControls))
3767 if (chs.trackersOnly && c->tracker)
3769 if (chs.matchHost.ip)
3771 if ((c->rhost[0].ip == chs.matchHost.ip) && c->rhost[1].isValid())
3775 best.host = best.rhost[1]; // use lan ip
3777 }else if (c->firewalled == chs.useFirewalled)
3781 best.host = best.rhost[0]; // use wan ip
3783 }else if (!chs.trackersOnly && !c->tracker)
3785 if (chs.matchHost.ip)
3787 if ((c->rhost[0].ip == chs.matchHost.ip) && c->rhost[1].isValid())
3791 best.host = best.rhost[1]; // use lan ip
3793 }else if (c->firewalled == chs.useFirewalled && (!bestP || !bestP->relay))
3797 best.host = best.rhost[0]; // use wan ip
3807 if (chs.numResults < ChanHitSearch::MAX_RESULTS)
3810 bestP->lastContact = ctime;
3811 chs.best[chs.numResults++] = best;
3821 // -----------------------------------
3822 int ChanHitList::pickSourceHits(ChanHitSearch &chs)
3824 if (pickHits(chs) && chs.best[0].numHops == 0) return 1;
3829 // -----------------------------------
3830 const char *ChanInfo::getTypeStr(TYPE t)
3834 case T_RAW: return "RAW";
3836 case T_MP3: return "MP3";
3837 case T_OGG: return "OGG";
3838 case T_OGM: return "OGM";
3839 case T_WMA: return "WMA";
3841 case T_MOV: return "MOV";
3842 case T_MPG: return "MPG";
3843 case T_NSV: return "NSV";
3844 case T_WMV: return "WMV";
3846 case T_PLS: return "PLS";
3847 case T_ASX: return "ASX";
3849 default: return "UNKNOWN";
3852 // -----------------------------------
3853 const char *ChanInfo::getProtocolStr(PROTOCOL t)
3857 case SP_PEERCAST: return "PEERCAST";
3858 case SP_HTTP: return "HTTP";
3859 case SP_FILE: return "FILE";
3860 case SP_MMS: return "MMS";
3861 case SP_PCP: return "PCP";
3862 default: return "UNKNOWN";
3865 // -----------------------------------
3866 ChanInfo::PROTOCOL ChanInfo::getProtocolFromStr(const char *str)
3868 if (stricmp(str,"PEERCAST")==0)
3870 else if (stricmp(str,"HTTP")==0)
3872 else if (stricmp(str,"FILE")==0)
3874 else if (stricmp(str,"MMS")==0)
3876 else if (stricmp(str,"PCP")==0)
3882 // -----------------------------------
3883 const char *ChanInfo::getTypeExt(TYPE t)
3887 case ChanInfo::T_OGM:
3888 case ChanInfo::T_OGG:
3890 case ChanInfo::T_MP3:
3892 case ChanInfo::T_MOV:
3894 case ChanInfo::T_NSV:
3896 case ChanInfo::T_WMV:
3898 case ChanInfo::T_WMA:
3904 // -----------------------------------
3905 ChanInfo::TYPE ChanInfo::getTypeFromStr(const char *str)
3907 if (stricmp(str,"MP3")==0)
3909 else if (stricmp(str,"OGG")==0)
3911 else if (stricmp(str,"OGM")==0)
3913 else if (stricmp(str,"RAW")==0)
3915 else if (stricmp(str,"NSV")==0)
3917 else if (stricmp(str,"WMA")==0)
3919 else if (stricmp(str,"WMV")==0)
3921 else if (stricmp(str,"PLS")==0)
3923 else if (stricmp(str,"M3U")==0)
3925 else if (stricmp(str,"ASX")==0)
3930 // -----------------------------------
3931 bool ChanInfo::matchNameID(ChanInfo &inf)
3934 if (id.isSame(inf.id))
3937 if (!inf.name.isEmpty())
3938 if (name.contains(inf.name))
3943 // -----------------------------------
3944 bool ChanInfo::match(ChanInfo &inf)
3948 if (inf.status != S_UNKNOWN)
3950 if (status != inf.status)
3954 if (inf.bitrate != 0)
3956 if (bitrate == inf.bitrate)
3963 if (id.isSame(inf.id))
3968 if (inf.contentType != T_UNKNOWN)
3970 if (contentType == inf.contentType)
3975 if (!inf.name.isEmpty())
3977 if (name.contains(inf.name))
3982 if (!inf.genre.isEmpty())
3984 if (genre.contains(inf.genre))
3991 // -----------------------------------
3992 bool TrackInfo::update(TrackInfo &inf)
3994 bool changed = false;
3996 if (!contact.isSame(inf.contact))
3998 contact = inf.contact;
4002 if (!title.isSame(inf.title))
4008 if (!artist.isSame(inf.artist))
4010 artist = inf.artist;
4014 if (!album.isSame(inf.album))
4020 if (!genre.isSame(inf.genre))
4031 // -----------------------------------
4032 bool ChanInfo::update(ChanInfo &info)
4034 bool changed = false;
4039 if (!info.id.isSet())
4042 // only update from chaninfo that has full name etc..
4043 if (info.name.isEmpty())
4046 // check valid broadcaster key
4049 if (!bcID.isSame(info.bcID))
4051 LOG_ERROR("ChanInfo BC key not valid");
4061 if (bitrate != info.bitrate)
4063 bitrate = info.bitrate;
4067 if (contentType != info.contentType)
4069 contentType = info.contentType;
4073 if (!desc.isSame(info.desc)) //JP-EX
4079 if (!name.isSame(info.name))
4085 if (!comment.isSame(info.comment))
4087 comment = info.comment;
4091 if (!genre.isSame(info.genre))
4097 if (!url.isSame(info.url))
4103 if (track.update(info.track))
4109 // -----------------------------------
4110 void ChanInfo::initNameID(const char *n)
4118 // -----------------------------------
4119 void ChanInfo::init()
4124 contentType = T_UNKNOWN;
4125 srcProtocol = SP_UNKNOWN;
4137 // -----------------------------------
4138 void ChanInfo::readTrackXML(XML::Node *n)
4141 readXMLString(track.title,n,"title");
4142 readXMLString(track.contact,n,"contact");
4143 readXMLString(track.artist,n,"artist");
4144 readXMLString(track.album,n,"album");
4145 readXMLString(track.genre,n,"genre");
4147 // -----------------------------------
4148 unsigned int ChanInfo::getUptime()
4150 // calculate uptime and cap if requested by settings.
4152 upt = lastPlayStart?(sys->getTime()-lastPlayStart):0;
4153 if (chanMgr->maxUptime)
4154 if (upt > chanMgr->maxUptime)
4155 upt = chanMgr->maxUptime;
4158 // -----------------------------------
4159 unsigned int ChanInfo::getAge()
4161 return sys->getTime()-createdTime;
4164 // ------------------------------------------
4165 void ChanInfo::readTrackAtoms(AtomStream &atom,int numc)
4167 for(int i=0; i<numc; i++)
4170 ID4 id = atom.read(c,d);
4171 if (id == PCP_CHAN_TRACK_TITLE)
4173 atom.readString(track.title.data,sizeof(track.title.data),d);
4174 }else if (id == PCP_CHAN_TRACK_CREATOR)
4176 atom.readString(track.artist.data,sizeof(track.artist.data),d);
4177 }else if (id == PCP_CHAN_TRACK_URL)
4179 atom.readString(track.contact.data,sizeof(track.contact.data),d);
4180 }else if (id == PCP_CHAN_TRACK_ALBUM)
4182 atom.readString(track.album.data,sizeof(track.album.data),d);
4187 // ------------------------------------------
4188 void ChanInfo::readInfoAtoms(AtomStream &atom,int numc)
4190 for(int i=0; i<numc; i++)
4193 ID4 id = atom.read(c,d);
4194 if (id == PCP_CHAN_INFO_NAME)
4196 atom.readString(name.data,sizeof(name.data),d);
4197 }else if (id == PCP_CHAN_INFO_BITRATE)
4199 bitrate = atom.readInt();
4200 }else if (id == PCP_CHAN_INFO_GENRE)
4202 atom.readString(genre.data,sizeof(genre.data),d);
4203 }else if (id == PCP_CHAN_INFO_URL)
4205 atom.readString(url.data,sizeof(url.data),d);
4206 }else if (id == PCP_CHAN_INFO_DESC)
4208 atom.readString(desc.data,sizeof(desc.data),d);
4209 }else if (id == PCP_CHAN_INFO_COMMENT)
4211 atom.readString(comment.data,sizeof(comment.data),d);
4212 }else if (id == PCP_CHAN_INFO_TYPE)
4215 atom.readString(type,sizeof(type),d);
4216 contentType = ChanInfo::getTypeFromStr(type);
4222 // -----------------------------------
4223 void ChanInfo::writeInfoAtoms(AtomStream &atom)
4225 atom.writeParent(PCP_CHAN_INFO,7);
4226 atom.writeString(PCP_CHAN_INFO_NAME,name.cstr());
4227 atom.writeInt(PCP_CHAN_INFO_BITRATE,bitrate);
4228 atom.writeString(PCP_CHAN_INFO_GENRE,genre.cstr());
4229 atom.writeString(PCP_CHAN_INFO_URL,url.cstr());
4230 atom.writeString(PCP_CHAN_INFO_DESC,desc.cstr());
4231 atom.writeString(PCP_CHAN_INFO_COMMENT,comment.cstr());
4232 atom.writeString(PCP_CHAN_INFO_TYPE,getTypeStr(contentType));
4235 // -----------------------------------
4236 void ChanInfo::writeTrackAtoms(AtomStream &atom)
4238 atom.writeParent(PCP_CHAN_TRACK,4);
4239 atom.writeString(PCP_CHAN_TRACK_TITLE,track.title.cstr());
4240 atom.writeString(PCP_CHAN_TRACK_CREATOR,track.artist.cstr());
4241 atom.writeString(PCP_CHAN_TRACK_URL,track.contact.cstr());
4242 atom.writeString(PCP_CHAN_TRACK_ALBUM,track.album.cstr());
4246 // -----------------------------------
4247 XML::Node *ChanInfo::createChannelXML()
4251 String nameUNI = name;
4252 nameUNI.convertTo(String::T_UNICODESAFE);
4254 String urlUNI = url;
4255 urlUNI.convertTo(String::T_UNICODESAFE);
4257 String genreUNI = genre;
4258 genreUNI.convertTo(String::T_UNICODESAFE);
4260 String descUNI = desc;
4261 descUNI.convertTo(String::T_UNICODESAFE);
4264 commentUNI = comment;
4265 commentUNI.convertTo(String::T_UNICODESAFE);
4271 return new XML::Node("channel name=\"%s\" id=\"%s\" bitrate=\"%d\" type=\"%s\" genre=\"%s\" desc=\"%s\" url=\"%s\" uptime=\"%d\" comment=\"%s\" skips=\"%d\" age=\"%d\" bcflags=\"%d\"",
4275 getTypeStr(contentType),
4287 // -----------------------------------
4288 XML::Node *ChanInfo::createQueryXML()
4294 String nameHTML = name;
4295 nameHTML.convertTo(String::T_HTML);
4296 String genreHTML = genre;
4297 genreHTML.convertTo(String::T_HTML);
4300 if (!nameHTML.isEmpty())
4302 strcat(buf," name=\"");
4303 strcat(buf,nameHTML.cstr());
4307 if (!genreHTML.isEmpty())
4309 strcat(buf," genre=\"");
4310 strcat(buf,genreHTML.cstr());
4317 strcat(buf," id=\"");
4323 return new XML::Node("channel %s",buf);
4326 // -----------------------------------
4327 XML::Node *ChanInfo::createRelayChannelXML()
4334 return new XML::Node("channel id=\"%s\" uptime=\"%d\" skips=\"%d\" age=\"%d\"",
4340 }// -----------------------------------
4341 XML::Node *ChanInfo::createTrackXML()
4343 String titleUNI = track.title;
4344 titleUNI.convertTo(String::T_UNICODESAFE);
4346 String artistUNI = track.artist;
4347 artistUNI.convertTo(String::T_UNICODESAFE);
4349 String albumUNI = track.album;
4350 albumUNI.convertTo(String::T_UNICODESAFE);
4352 String genreUNI = track.genre;
4353 genreUNI.convertTo(String::T_UNICODESAFE);
4355 String contactUNI = track.contact;
4356 contactUNI.convertTo(String::T_UNICODESAFE);
4360 return new XML::Node("track title=\"%s\" artist=\"%s\" album=\"%s\" genre=\"%s\" contact=\"%s\"",
4369 // -----------------------------------
4370 void ChanInfo::init(XML::Node *n)
4376 // -----------------------------------
4377 void ChanInfo::updateFromXML(XML::Node *n)
4379 String typeStr,idStr;
4381 readXMLString(name,n,"name");
4382 readXMLString(genre,n,"genre");
4383 readXMLString(url,n,"url");
4384 readXMLString(desc,n,"desc");
4387 int br = n->findAttrInt("bitrate");
4391 readXMLString(typeStr,n,"type");
4392 if (!typeStr.isEmpty())
4393 contentType = getTypeFromStr(typeStr.cstr());
4396 readXMLString(idStr,n,"id");
4397 if (!idStr.isEmpty())
4398 id.fromStr(idStr.cstr());
4400 readXMLString(comment,n,"comment");
4402 XML::Node *tn = n->findNode("track");
4408 // -----------------------------------
4409 void ChanInfo::init(const char *n, GnuID &cid, TYPE tp, int br)
4419 // -----------------------------------
4420 void ChanInfo::init(const char *fn)
4427 // -----------------------------------
4428 void PlayList::readASX(Stream &in)
4430 LOG_DEBUG("Reading ASX");
4436 }catch(StreamException &) {} // TODO: eof is NOT handled properly in sockets - always get error at end
4440 XML::Node *n = xml.root->child;
4443 if (stricmp("entry",n->getName())==0)
4445 XML::Node *rf = n->findNode("ref");
4448 char *hr = rf->findAttr("href");
4452 //LOG("asx url %s",hr);
4461 // -----------------------------------
4462 void PlayList::readSCPLS(Stream &in)
4465 while (in.readLine(tmp,sizeof(tmp)))
4467 if (strnicmp(tmp,"file",4)==0)
4469 char *p = strstr(tmp,"=");
4475 // -----------------------------------
4476 void PlayList::readPLS(Stream &in)
4479 while (in.readLine(tmp,sizeof(tmp)))
4485 // -----------------------------------
4486 void PlayList::writeSCPLS(Stream &out)
4488 out.writeLine("[playlist]");
4490 out.writeLineF("NumberOfEntries=%d",numURLs);
4492 for(int i=0; i<numURLs; i++)
4494 out.writeLineF("File%d=%s",i+1,urls[i].cstr());
4495 out.writeLineF("Title%d=%s",i+1,titles[i].cstr());
4496 out.writeLineF("Length%d=-1",i+1);
4498 out.writeLine("Version=2");
4500 // -----------------------------------
4501 void PlayList::writePLS(Stream &out)
4503 for(int i=0; i<numURLs; i++)
4504 out.writeLineF("%s",urls[i].cstr());
4506 // -----------------------------------
4507 void PlayList::writeRAM(Stream &out)
4509 for(int i=0; i<numURLs; i++)
4510 out.writeLineF("%s",urls[i].cstr());
4513 // -----------------------------------
4514 void PlayList::writeASX(Stream &out)
4516 out.writeLine("<ASX Version=\"3.0\">");
4517 for(int i=0; i<numURLs; i++)
4519 out.writeLine("<ENTRY>");
4520 out.writeLineF("<REF href = \"%s\" />",urls[i].cstr());
4521 out.writeLine("</ENTRY>");
4523 out.writeLine("</ASX>");
4527 // -----------------------------------
4528 void PlayList::addChannel(const char *path, ChanInfo &info)
4534 info.id.toStr(idStr);
4535 char *nid = info.id.isSet()?idStr:info.name.cstr();
4537 sprintf(url.cstr(),"%s/stream/%s%s",path,nid,ChanInfo::getTypeExt(info.contentType));
4538 addURL(url.cstr(),info.name);
4541 // -----------------------------------
4542 void ChanHitSearch::init()
4546 useFirewalled = false;
4547 trackersOnly = false;
4548 useBusyRelays = true;
4549 useBusyControls = true;
4553 //seed = sys->getTime();
4557 int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList *chl)
4564 static int base = 0x400;
4565 ChanHit tmpHit[MAX_RESULTS];
4566 static WLock seqLock;
4567 static unsigned int riSequence = 0;
4575 riSequence &= 0xffffff;
4578 Servent *s = servMgr->servents;
4580 if (s->serventHit.rhost[0].port && s->type == Servent::T_RELAY
4581 && s->chanID.isSame(chl->info.id)) {
4582 int i = index % MAX_RESULTS;
4583 if (index < MAX_RESULTS
4584 || tmpHit[i].lastSendSeq > s->serventHit.lastSendSeq) {
4585 s->serventHit.lastSendSeq = seq;
4586 tmpHit[i] = s->serventHit;
4587 tmpHit[i].host = s->serventHit.rhost[0];
4594 ChanHit *hit = chl->hit;
4597 if (hit->host.ip && !hit->dead){
4599 (!exID.isSame(hit->sessionID))
4602 && (!hit->firewalled)
4603 && (hit->numHops != 0)
4605 if ( (hit->rhost[0].ip == host1.ip)
4606 && hit->rhost[1].isValid()
4607 && (host2.ip != hit->rhost[1].ip)
4610 best[0].host = hit->rhost[1];
4613 if ((hit->rhost[0].ip == host2.ip) && hit->rhost[1].isValid()){
4615 best[0].host = hit->rhost[1];
4619 loop = (index / MAX_RESULTS) + 1;
4620 //prob = (float)1 / (float)loop;
4622 //rnd = (float)rand() / (float)RAND_MAX;
4623 rnd = rand() % base;
4624 if (hit->numHops == 1){
4626 if (tmpHit[index % MAX_RESULTS].numHops == 1){
4628 tmpHit[index % MAX_RESULTS] = *hit;
4629 tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
4633 tmpHit[index % MAX_RESULTS] = *hit;
4634 tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
4639 if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob) || rnd == 0){
4640 tmpHit[index % MAX_RESULTS] = *hit;
4641 tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
4647 // hit->host.toStr(tmp);
4648 // LOG_DEBUG("TEST %s: %f %f", tmp, rnd, prob);
4654 if (index > MAX_RESULTS){
4660 /* int use[MAX_RESULTS];
4661 memset(use, 0, sizeof(use));
4663 for (i = 0; i < cnt; i++){
4667 for (i = 0; i < cnt; i++){
4670 // LOG_DEBUG("%d",r);
4678 for (i = 0; i < cnt; i++){
4679 // LOG_DEBUG("%d", use[i]);
4680 best[use[i]] = tmpHit[i];
4683 int use[MAX_RESULTS];
4685 for (i = 0; i < cnt; i++) {
4686 use[i] = (i + seq) % cnt;
4689 for (i = 0; i < cnt; i++){
4690 // LOG_DEBUG("%d", use[i]);
4691 best[use[i]] = tmpHit[i];
4693 // for (i = 0; i < cnt; i++){
4695 // best[i].host.toStr(tmp);
4696 // LOG_DEBUG("Relay info: Hops = %d, %s", best[i].numHops, tmp);