OSDN Git Service

DoS耐性を向上
[peercast-im/PeerCastIM.git] / core / common / servent.cpp
1 // ------------------------------------------------
2 // File : servent.cpp
3 // Date: 4-apr-2002
4 // Author: giles
5 // Desc: 
6 //              Servents are the actual connections between clients. They do the handshaking,
7 //              transfering of data and processing of GnuPackets. Each servent has one socket allocated
8 //              to it on connect, it uses this to transfer all of its data.
9 //
10 // (c) 2002 peercast.org
11 // ------------------------------------------------
12 // This program is free software; you can redistribute it and/or modify
13 // it under the terms of the GNU General Public License as published by
14 // the Free Software Foundation; either version 2 of the License, or
15 // (at your option) any later version.
16
17 // This program is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 // GNU General Public License for more details.
21 // ------------------------------------------------
22 // todo: make lan->yp not check firewall
23
24 #include <stdlib.h>
25 #include "servent.h"
26 #include "sys.h"
27 #include "gnutella.h"
28 #include "xml.h"
29 #include "html.h"
30 #include "http.h"
31 #include "stats.h"
32 #include "servmgr.h"
33 #include "peercast.h"
34 #include "atom.h"
35 #include "pcp.h"
36 #include "version2.h"
37 #ifdef _DEBUG
38 #include "chkMemoryLeak.h"
39 #define DEBUG_NEW new(__FILE__, __LINE__)
40 #define new DEBUG_NEW
41 #endif
42
43 #include "win32/seh.h"
44
45
46 const int DIRECT_WRITE_TIMEOUT = 60;
47
48 // -----------------------------------
49 char *Servent::statusMsgs[]=
50 {
51         "NONE",
52                 "CONNECTING",
53         "PROTOCOL",
54         "HANDSHAKE",
55         "CONNECTED",
56         "CLOSING",
57                 "LISTENING",
58                 "TIMEOUT",
59                 "REFUSED",
60                 "VERIFIED",
61                 "ERROR",
62                 "WAIT",
63                 "FREE"
64 };
65
66 // -----------------------------------
67 char *Servent::typeMsgs[]=
68 {
69                 "NONE",
70         "INCOMING",
71         "SERVER",
72                 "RELAY",
73                 "DIRECT",
74                 "COUT",
75                 "CIN",
76                 "PGNU"
77 };
78 // -----------------------------------
79 bool    Servent::isPrivate() 
80 {
81         Host h = getHost();
82         return servMgr->isFiltered(ServFilter::F_PRIVATE,h) || h.isLocalhost();
83 }
84 // -----------------------------------
85 bool    Servent::isAllowed(int a) 
86 {
87         Host h = getHost();
88
89         if (servMgr->isFiltered(ServFilter::F_BAN,h))
90                 return false;
91
92         return (allow&a)!=0;
93 }
94
95 // -----------------------------------
96 bool    Servent::isFiltered(int f) 
97 {
98         Host h = getHost();
99         return servMgr->isFiltered(f,h);
100 }
101
102 int servent_count = 1;
103 // -----------------------------------
104 Servent::Servent(int index)
105 :outPacketsPri(MAX_OUTPACKETS)
106 ,outPacketsNorm(MAX_OUTPACKETS)
107 ,seenIDs(MAX_HASH)
108 ,serventIndex(index)
109 ,sock(NULL)
110 ,next(NULL)
111 {
112         reset();
113         servent_id = servent_count++;
114         lastSkipTime = 0;
115         lastSkipCount = 0;
116         waitPort = 0;
117 }
118
119 // -----------------------------------
120 Servent::~Servent()
121 {       
122         
123 }
124 // -----------------------------------
125 void    Servent::kill() 
126 {
127         thread.active = false;
128                 
129         setStatus(S_CLOSING);
130
131         if (pcpStream)
132         {
133                 PCPStream *pcp = pcpStream;
134                 pcpStream = NULL;
135                 pcp->kill();
136                 delete pcp;
137         }
138
139         chanMgr->hitlistlock.on();
140         ChanHitList *chl = chanMgr->findHitListByID(chanID);
141         if (chl) {
142                 ChanHit *chh = chl->hit;
143                 ChanHit *prev = NULL;
144                 while (chh) {
145                         if (chh->servent_id == this->servent_id){
146                                 if ((servMgr->kickKeepTime != 0) && (chh->firewalled == 1)){
147                                         chh->numHops = 0;
148                                         chh->numListeners = 0;
149                                         chh->numRelays = 0;
150                                         prev = chh;
151                                         chh = chh->next;
152                                 } else {
153                                         ChanHit *next = chh->next;
154                                         if (prev)
155                                                 prev->next = next;
156                                         else
157                                                 chl->hit = next;
158
159                                         delete chh;
160                                         chh = next;
161                                 }
162                         } else {
163                                 prev = chh;
164                                 chh = chh->next;
165                         }
166                 }
167         }
168         chanMgr->hitlistlock.off();
169
170         if (sock)
171         {
172                 sock->close();
173                 delete sock;
174                 sock = NULL;
175         }
176
177         if (pushSock)
178         {
179                 pushSock->close();
180                 delete pushSock;
181                 pushSock = NULL;
182         }
183
184 //      thread.unlock();
185
186         if (type != T_SERVER)
187         {
188                 reset();
189                 setStatus(S_FREE);
190         }
191
192 }
193 // -----------------------------------
194 void    Servent::abort() 
195 {
196         thread.active = false;
197         if (sock)
198         {
199                 sock->close();
200         }
201
202 }
203
204 // -----------------------------------
205 void Servent::reset()
206 {
207
208         remoteID.clear();
209
210         servPort = 0;
211
212         pcpStream = NULL;
213
214         flowControl = false;
215         networkID.clear();
216
217         chanID.clear();
218
219         outputProtocol = ChanInfo::SP_UNKNOWN;
220
221         agent.clear();
222         sock = NULL;
223         allow = ALLOW_ALL;
224         syncPos = 0;
225         addMetadata = false;
226         nsSwitchNum = 0;
227         pack.func = 255;
228         lastConnect = lastPing = lastPacket = 0;
229         loginPassword.clear();
230         loginMount.clear();
231         bytesPerSecond = 0;
232         priorityConnect = false;
233         pushSock = NULL;
234         sendHeader = true;
235
236         outPacketsNorm.reset();
237         outPacketsPri.reset();
238
239         seenIDs.clear();
240
241         status = S_NONE;
242         type = T_NONE;
243
244         channel_id = 0;
245
246         serventHit.init();
247 }
248 // -----------------------------------
249 bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
250 {
251
252         if  (      (type == t) 
253                         && (isConnected())
254                         && (!cid.isSet() || chanID.isSame(cid))
255                         && (!sid.isSet() || !sid.isSame(remoteID))
256                         && (pcpStream != NULL)
257                 )
258         {
259                 return pcpStream->sendPacket(pack,did);
260         }
261         return false;
262 }
263
264
265 // -----------------------------------
266 bool Servent::acceptGIV(ClientSocket *givSock)
267 {
268         if (!pushSock)
269         {
270                 pushSock = givSock;
271                 return true;
272         }else
273                 return false;
274 }
275
276 // -----------------------------------
277 Host Servent::getHost()
278 {
279         Host h(0,0);
280
281         if (sock)
282                 h = sock->host;
283
284         return h;
285 }
286
287 // -----------------------------------
288 bool Servent::outputPacket(GnuPacket &p, bool pri)
289 {
290         lock.on();
291
292         bool r=false;
293         if (pri)
294                 r = outPacketsPri.write(p);
295         else
296         {
297                 if (servMgr->useFlowControl)
298                 {
299                         int per = outPacketsNorm.percentFull();
300                         if (per > 50)
301                                 flowControl = true;
302                         else if (per < 25)
303                                 flowControl = false;
304                 }
305
306
307                 bool send=true;
308                 if (flowControl)
309                 {
310                         // if in flowcontrol, only allow packets with less of a hop count than already in queue
311                         if (p.hops >= outPacketsNorm.findMinHop())
312                                 send = false;
313                 }
314
315                 if (send)
316                         r = outPacketsNorm.write(p);
317         }
318
319         lock.off();
320         return r;
321 }
322
323 // -----------------------------------
324 bool Servent::initServer(Host &h)
325 {
326         try
327         {
328                 checkFree();
329
330                 status = S_WAIT;
331
332                 createSocket();
333
334                 sock->bind(h);
335
336                 thread.data = this;
337
338                 thread.func = serverProc;
339
340                 type = T_SERVER;
341
342                 if (!sys->startThread(&thread))
343                         throw StreamException("Can`t start thread");
344
345         }catch(StreamException &e)
346         {
347                 LOG_ERROR("Bad server: %s",e.msg);
348                 kill();
349                 return false;
350         }
351
352         return true;
353 }
354 // -----------------------------------
355 void Servent::checkFree()
356 {
357         if (sock)
358                 throw StreamException("Socket already set");
359         if (thread.active)
360                 throw StreamException("Thread already active");
361 }
362 // -----------------------------------
363 void Servent::initIncoming(ClientSocket *s, unsigned int a)
364 {
365
366         try{
367
368                 checkFree();
369
370                 type = T_INCOMING;
371                 sock = s;
372                 allow = a;
373                 thread.data = this;
374                 thread.func = incomingProc;
375                 thread.finish = false;
376
377                 setStatus(S_PROTOCOL);
378
379                 char ipStr[64];
380                 sock->host.toStr(ipStr);
381                 LOG_DEBUG("Incoming from %s",ipStr);
382
383                 if (!sys->startThread(&thread))
384                         throw StreamException("Can`t start thread");
385         }catch(StreamException &e)
386         {
387                 //LOG_ERROR("!!FATAL!! Incoming error: %s",e.msg);
388                 //servMgr->shutdownTimer = 1;   
389                 kill();
390
391                 LOG_ERROR("INCOMING FAILED: %s",e.msg);
392
393         }
394 }
395
396 // -----------------------------------
397 void Servent::initOutgoing(TYPE ty)
398 {
399         try 
400         {
401                 checkFree();
402
403
404                 type = ty;
405
406                 thread.data = this;
407                 thread.func = outgoingProc;
408
409                 if (!sys->startThread(&thread))
410                         throw StreamException("Can`t start thread");
411
412         }catch(StreamException &e)
413         {
414                 LOG_ERROR("Unable to start outgoing: %s",e.msg);
415                 kill();
416         }
417 }
418
419 // -----------------------------------
420 void Servent::initPCP(Host &rh)
421 {
422         char ipStr[64];
423         rh.toStr(ipStr);
424         try 
425         {
426                 checkFree();
427
428             createSocket();
429
430                 type = T_COUT;
431
432                 sock->open(rh);
433
434                 if (!isAllowed(ALLOW_NETWORK))
435                         throw StreamException("Servent not allowed");
436
437                 thread.data = this;
438                 thread.func = outgoingProc;
439
440                 LOG_DEBUG("Outgoing to %s",ipStr);
441
442                 if (!sys->startThread(&thread))
443                         throw StreamException("Can`t start thread");
444
445         }catch(StreamException &e)
446         {
447                 LOG_ERROR("Unable to open connection to %s - %s",ipStr,e.msg);
448                 kill();
449         }
450 }
451
452 #if 0
453 // -----------------------------------
454 void    Servent::initChannelFetch(Host &host)
455 {
456         type = T_STREAM;
457
458         char ipStr[64];
459         host.toStr(ipStr);
460
461         checkFree();
462          
463         createSocket();
464                 
465         sock->open(host);
466
467                 
468         if (!isAllowed(ALLOW_DATA))     
469                 throw StreamException("Servent not allowed");
470                 
471         sock->connect();
472 }
473 #endif
474
475 // -----------------------------------
476 void Servent::initGIV(Host &h, GnuID &id)
477 {
478         char ipStr[64];
479         h.toStr(ipStr);
480         try 
481         {
482                 checkFree();
483
484                 givID = id;
485
486             createSocket();
487
488                 sock->open(h);
489
490                 if (!isAllowed(ALLOW_NETWORK))
491                         throw StreamException("Servent not allowed");
492                 
493                 sock->connect();
494
495                 thread.data = this;
496                 thread.func = givProc;
497
498                 type = T_RELAY;
499
500                 if (!sys->startThread(&thread))
501                         throw StreamException("Can`t start thread");
502
503         }catch(StreamException &e)
504         {
505                 LOG_ERROR("GIV error to %s: %s",ipStr,e.msg);
506                 kill();
507         }
508 }
509 // -----------------------------------
510 void Servent::createSocket()
511 {
512         if (sock)
513                 LOG_ERROR("Servent::createSocket attempt made while active");
514
515         sock = sys->createSocket();
516 }
517 // -----------------------------------
518 void Servent::setStatus(STATUS s)
519 {
520         if (s != status)
521         {
522                 status = s;
523
524                 if ((s == S_HANDSHAKE) || (s == S_CONNECTED) || (s == S_LISTENING))
525                         lastConnect = sys->getTime();
526         }
527
528 }
529
530 // -----------------------------------
531 void Servent::handshakeOut()
532 {
533     sock->writeLine(GNU_PEERCONN);
534
535         char str[64];
536     
537         sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_AGENT);
538     sock->writeLineF("%s %d",PCX_HS_PCP,1);
539
540         if (priorityConnect)
541             sock->writeLineF("%s %d",PCX_HS_PRIORITY,1);
542         
543         if (networkID.isSet())
544         {
545                 networkID.toStr(str);
546                 sock->writeLineF("%s %s",PCX_HS_NETWORKID,str);
547         }
548
549         servMgr->sessionID.toStr(str);
550         sock->writeLineF("%s %s",PCX_HS_ID,str);
551
552         
553     sock->writeLineF("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS());
554         
555         sock->writeLine("");
556
557         HTTP http(*sock);
558
559         int r = http.readResponse();
560
561         if (r != 200)
562         {
563                 LOG_ERROR("Expected 200, got %d",r);
564                 throw StreamException("Unexpected HTTP response");
565         }
566
567
568         bool versionValid = false;
569
570         GnuID clientID;
571         clientID.clear();
572
573     while (http.nextHeader())
574     {
575                 LOG_DEBUG(http.cmdLine);
576
577                 char *arg = http.getArgStr();
578                 if (!arg)
579                         continue;
580
581                 if (http.isHeader(HTTP_HS_AGENT))
582                 {
583                         agent.set(arg);
584
585                         if (strnicmp(arg,"PeerCast/",9)==0)
586                                 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
587                 }else if (http.isHeader(PCX_HS_NETWORKID))
588                         clientID.fromStr(arg);
589     }
590
591         if (!clientID.isSame(networkID))
592                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
593
594         if (!versionValid)
595                 throw HTTPException(HTTP_SC_UNAUTHORIZED,401);
596
597
598     sock->writeLine(GNU_OK);
599     sock->writeLine("");
600
601 }
602
603
604 // -----------------------------------
605 void Servent::processOutChannel()
606 {
607 }
608
609
610 // -----------------------------------
611 void Servent::handshakeIn()
612 {
613
614         int osType=0;
615
616         HTTP http(*sock);
617
618
619         bool versionValid = false;
620         bool diffRootVer = false;
621
622         GnuID clientID;
623         clientID.clear();
624
625     while (http.nextHeader())
626     {
627                 LOG_DEBUG("%s",http.cmdLine);
628
629                 char *arg = http.getArgStr();
630                 if (!arg)
631                         continue;
632
633                 if (http.isHeader(HTTP_HS_AGENT))
634                 {
635                         agent.set(arg);
636
637                         if (strnicmp(arg,"PeerCast/",9)==0)
638                         {
639                                 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
640                                 diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0;
641                         }
642                 }else if (http.isHeader(PCX_HS_NETWORKID))
643                 {
644                         clientID.fromStr(arg);
645
646                 }else if (http.isHeader(PCX_HS_PRIORITY))
647                 {
648                         priorityConnect = atoi(arg)!=0;
649
650                 }else if (http.isHeader(PCX_HS_ID))
651                 {
652                         GnuID id;
653                         id.fromStr(arg);
654                         if (id.isSame(servMgr->sessionID))
655                                 throw StreamException("Servent loopback");
656
657                 }else if (http.isHeader(PCX_HS_OS))
658                 {
659                         if (stricmp(arg,PCX_OS_LINUX)==0)
660                                 osType = 1;
661                         else if (stricmp(arg,PCX_OS_WIN32)==0)
662                                 osType = 2;
663                         else if (stricmp(arg,PCX_OS_MACOSX)==0)
664                                 osType = 3;
665                         else if (stricmp(arg,PCX_OS_WINAMP2)==0)
666                                 osType = 4;
667                 }
668
669     }
670
671         if (!clientID.isSame(networkID))
672                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
673
674         // if this is a priority connection and all incoming connections 
675         // are full then kill an old connection to make room. Otherwise reject connection.
676         //if (!priorityConnect)
677         {
678                 if (!isPrivate())
679                         if (servMgr->pubInFull())
680                                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
681         }
682
683         if (!versionValid)
684                 throw HTTPException(HTTP_SC_FORBIDDEN,403);
685
686     sock->writeLine(GNU_OK);
687
688     sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_OLDAGENT);
689
690         if (networkID.isSet())
691         {
692                 char idStr[64];
693                 networkID.toStr(idStr);
694                 sock->writeLineF("%s %s",PCX_HS_NETWORKID,idStr);
695         }
696
697         if (servMgr->isRoot)
698         {
699                 sock->writeLineF("%s %d",PCX_HS_FLOWCTL,servMgr->useFlowControl?1:0);
700                 sock->writeLineF("%s %d",PCX_HS_MINBCTTL,chanMgr->minBroadcastTTL);
701                 sock->writeLineF("%s %d",PCX_HS_MAXBCTTL,chanMgr->maxBroadcastTTL);
702                 sock->writeLineF("%s %d",PCX_HS_RELAYBC,servMgr->relayBroadcast);
703                 //sock->writeLine("%s %d",PCX_HS_FULLHIT,2);
704
705
706                 if (diffRootVer)
707                 {
708                         sock->writeString(PCX_HS_DL);
709                         sock->writeLine(PCX_DL_URL);
710                 }
711
712                 sock->writeLineF("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr());
713         }
714
715
716         char hostIP[64];
717         Host h = sock->host;
718         h.IPtoStr(hostIP);
719     sock->writeLineF("%s %s",PCX_HS_REMOTEIP,hostIP);
720
721
722     sock->writeLine("");
723
724
725         while (http.nextHeader());
726 }
727
728 // -----------------------------------
729 bool    Servent::pingHost(Host &rhost,GnuID &rsid)
730 {
731         char ipstr[64];
732         rhost.toStr(ipstr);
733         LOG_DEBUG("Ping host %s: trying..",ipstr);
734         ClientSocket *s=NULL;
735         bool hostOK=false;
736         try
737         {
738                 s = sys->createSocket();
739                 if (!s)
740                         return false;
741                 else
742                 {
743
744                         s->setReadTimeout(15000);
745                         s->setWriteTimeout(15000);
746                         s->open(rhost);
747                         s->connect();
748
749                         AtomStream atom(*s);
750
751                         atom.writeInt(PCP_CONNECT,1);
752                         atom.writeParent(PCP_HELO,1);
753                                 atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
754
755                         GnuID sid;
756                         sid.clear();
757
758                         int numc,numd;
759                         ID4 id = atom.read(numc,numd);
760                         if (id == PCP_OLEH)
761                         {
762                                 for(int i=0; i<numc; i++)
763                                 {
764                                         int c,d;
765                                         ID4 pid = atom.read(c,d);
766                                         if (pid == PCP_SESSIONID)
767                                                 atom.readBytes(sid.id,16,d);
768                                         else
769                                                 atom.skip(c,d);
770                                 }
771                         }else
772                         {
773                                 LOG_DEBUG("Ping response: %s",id.getString().str());
774                                 throw StreamException("Bad ping response");
775                         }
776
777                         if (!sid.isSame(rsid))
778                                 throw StreamException("SIDs don`t match");
779
780                         hostOK = true;
781                         LOG_DEBUG("Ping host %s: OK",ipstr);
782                         atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT);
783
784
785                 }
786         }catch(StreamException &e)
787         {
788                 LOG_DEBUG("Ping host %s: %s",ipstr,e.msg);
789         }
790         if (s)
791         {
792                 s->close();
793                 delete s;
794         }
795
796         if (!hostOK)
797                 rhost.port = 0;
798
799         return true;
800 }
801
802 WLock canStreamLock;
803
804 // -----------------------------------
805 bool Servent::handshakeStream(ChanInfo &chanInfo)
806 {
807
808         HTTP http(*sock);
809
810
811         bool gotPCP=false;
812         unsigned int reqPos=0;
813         unsigned short listenPort = 0;
814
815         nsSwitchNum=0;
816
817         while (http.nextHeader())
818         {
819                 char *arg = http.getArgStr();
820                 if (!arg)
821                         continue;
822
823                 if (http.isHeader(PCX_HS_PCP))
824                         gotPCP = atoi(arg)!=0;
825                 else if (http.isHeader(PCX_HS_POS))
826                         reqPos = atoi(arg);
827                 else if (http.isHeader(PCX_HS_PORT))
828                         listenPort = (unsigned short)atoi(arg);
829                 else if (http.isHeader("icy-metadata"))
830                         addMetadata = atoi(arg) > 0;
831                 else if (http.isHeader(HTTP_HS_AGENT))
832                         agent = arg;
833                 else if (http.isHeader("Pragma"))
834                 {
835                         char *ssc = stristr(arg,"stream-switch-count=");
836                         char *so = stristr(arg,"stream-offset");
837
838                         if (ssc || so)
839                         {
840                                 nsSwitchNum=1;
841                                 //nsSwitchNum = atoi(ssc+20);
842                         }
843                 }
844
845                 LOG_DEBUG("Stream: %s",http.cmdLine);
846         }
847
848
849         if ((!gotPCP) && (outputProtocol == ChanInfo::SP_PCP))
850                 outputProtocol = ChanInfo::SP_PEERCAST;
851
852         if (outputProtocol == ChanInfo::SP_HTTP)
853         {
854                 if  ( (chanInfo.srcProtocol == ChanInfo::SP_MMS)
855                           || (chanInfo.contentType == ChanInfo::T_WMA)
856                           || (chanInfo.contentType == ChanInfo::T_WMV)
857                           || (chanInfo.contentType == ChanInfo::T_ASX)
858                         )
859                 outputProtocol = ChanInfo::SP_MMS;
860         }
861
862
863         bool chanFound=false;
864         bool chanReady=false;
865
866         ChanHit *sourceHit = NULL;
867
868         Channel *ch = chanMgr->findChannelByID(chanInfo.id);
869         if (ch)
870         {
871                 sendHeader = true;
872                 if (reqPos || !isIndexTxt(&chanInfo))
873                 {
874                         streamPos = ch->rawData.findOldestPos(reqPos);
875                         //streamPos = ch->rawData.getLatestPos();
876                 }else
877                 {
878                         streamPos = ch->rawData.getLatestPos();
879                 }
880
881                 chanID = chanInfo.id;
882                 serventHit.host.ip = getHost().ip;
883                 serventHit.host.port = listenPort;
884                 if (serventHit.host.globalIP())
885                         serventHit.rhost[0] = serventHit.host;
886                 else
887                         serventHit.rhost[1] = serventHit.host;
888                 serventHit.chanID = chanID;
889
890                 canStreamLock.on();
891                 chanReady = canStream(ch);
892                 if (0 && !chanReady && ch->isPlaying())
893                 {
894                         if (ch->info.getUptime() > 60
895                                 && servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
896                         {
897                                 sourceHit = &ch->sourceHost;  // send source host info
898
899                                 if (listenPort)
900                                 {
901                                         // connect "this" host later
902                                         chanMgr->addHit(serventHit);
903                                 }
904
905                                 char tmp[50];
906                                 getHost().toStr(tmp);
907                                 LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
908                                 ch->bump = true;
909                         }
910                         else if (servMgr->kickUnrelayableHost(chanID, serventHit) != 0)
911                         {
912                                 chanReady = canStream(ch);
913                                 if (!chanReady)
914                                         LOG_DEBUG("*** Kicked unrelayable host, but still cannot stream");
915                         }
916                 }
917                 if (!chanReady) type = T_INCOMING;
918                 thread.active = chanReady;
919                 setStatus(S_CONNECTED);
920                 canStreamLock.off();
921                 channel_id = ch->channel_id;
922
923                 //JP-Patch add-s
924                 if (servMgr->isCheckPushStream())
925                 {
926                         if (chanReady == true)
927                         {
928                                 Host h = getHost();
929
930                                 if (!h.isLocalhost()) 
931                                 {
932                                         do 
933                                         {
934                                                 if (strstr(agent.cstr(),"PeerCast/0.119") != NULL) 
935                                                 {                                               
936                                                         char strip[256];
937                                                         h.toStr(strip);
938                                                         LOG_ERROR("Block v0.119 Servent : %s (%s)",strip,agent.cstr());
939                                                         chanReady = false;
940                                                         break;
941                                                 }
942
943 /*                                              ChanHitList *hits[ChanMgr::MAX_HITLISTS];
944                                                 int numHits=0;
945                                                 for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
946                                                 {
947                                                         ChanHitList *chl = &chanMgr->hitlists[i];
948                                                         if (chl->isUsed())
949                                                                 hits[numHits++] = chl;
950                                                 }
951                                                 bool isfw = false;
952                                                 int numRelay = 0;
953                                                 for(int i=0; i<numHits; i++)
954                                                 {
955                                                         ChanHitList *chl = hits[i];
956                                                         if (chl->isUsed())
957                                                         {
958                                                                 for (int j=0; j<ChanHitList::MAX_HITS; j++)
959                                                                 {
960                                                                         ChanHit *hit = &chl->hits[j];
961                                                                         if (hit->host.isValid() && (h.ip == hit->host.ip)) 
962                                                                         {
963                                                                                 if (hit->firewalled)
964                                                                                         isfw = true;
965                                                                                 numRelay = hit->numRelays;
966                                                                         }
967                                                                 }
968                                                         }
969                                                 }
970                                                 if ((isfw == true) && (numRelay == 0))
971                                                 {
972                                                         char strip[256];
973                                                         h.toStr(strip);
974                                                         LOG_ERROR("Block firewalled Servent : %s",strip);
975                                                         chanReady = false;
976                                                 }*/
977
978                                                 ChanHitList *chl = chanMgr->findHitList(chanInfo);
979                                                 ChanHit *hit = (chl ? chl->hit : NULL);
980                                                 while(hit){
981                                                         if (hit->host.isValid() && (h.ip == hit->host.ip))
982                                                         {
983                                                                 if ((hit->firewalled) && (hit->numRelays == 0)){
984                                                                         char strip[256];
985                                                                         h.toStr(strip);
986                                                                         LOG_ERROR("Block firewalled Servent : %s",strip);
987                                                                         chanReady = false;
988                                                                 }
989                                                         }
990                                                         hit = hit->next;
991                                                 }
992                                         }while (0);
993                                 }               
994                         }
995                 }
996                 //JP-Patch add-e
997         }
998
999 //      LockBlock lockblock(chanMgr->hitlistlock);
1000
1001 //      lockblock.lockon();
1002         ChanHitList *chl = chanMgr->findHitList(chanInfo);
1003
1004         if (chl)
1005         {
1006                 chanFound = true;
1007         }
1008
1009
1010         bool result = false;
1011
1012         char idStr[64];
1013         chanInfo.id.toStr(idStr);
1014
1015         char sidStr[64];
1016         servMgr->sessionID.toStr(sidStr);
1017
1018         Host rhost = sock->host;
1019
1020
1021
1022
1023         AtomStream atom(*sock);
1024
1025
1026
1027         if (!chanFound)
1028         {
1029                 sock->writeLine(HTTP_SC_NOTFOUND);
1030             sock->writeLine("");
1031                 LOG_DEBUG("Sending channel not found");
1032                 return false;
1033         }
1034
1035
1036         if (!chanReady)
1037         {
1038                 if (outputProtocol == ChanInfo::SP_PCP)
1039                 {
1040
1041                         char tbuf[8196];
1042                         MemoryStream mem(tbuf, sizeof(tbuf));
1043                         mem.writeLine(HTTP_SC_UNAVAILABLE);
1044                         mem.writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1045                         mem.writeLine("");
1046                         sock->write(tbuf, mem.getPosition());
1047
1048                         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1049
1050                         char ripStr[64];
1051                         rhost.toStr(ripStr);
1052
1053                         LOG_DEBUG("Sending channel unavailable");
1054
1055                         ChanHitSearch chs;
1056
1057                         mem.rewind();
1058                         AtomStream atom2(mem);
1059
1060                         int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
1061
1062                         if (sourceHit) {
1063                                 sourceHit->writeAtoms(atom2,chanInfo.id);       
1064                                 char tmp[50];
1065                                 sourceHit->host.toStr(tmp);
1066                                 LOG_DEBUG("relay info(sourceHit): %s", tmp);
1067                         }
1068
1069                         chanMgr->hitlistlock.on();
1070
1071                         chl = chanMgr->findHitList(chanInfo);
1072
1073                         if (chl && !sourceHit)
1074                         {
1075                                 ChanHit best;
1076                                 
1077                                 // search for up to 8 other hits
1078                                 int cnt=0;
1079                                 int i;
1080                                 for(i=0; i<8; i++)
1081                                 {
1082                                         best.init();
1083
1084
1085                                         // find best hit this network if local IP
1086                                         if (!rhost.globalIP())
1087                                         {
1088                                                 chs.init();
1089                                                 chs.matchHost = servMgr->serverHost;
1090                                                 chs.waitDelay = 2;
1091                                                 chs.excludeID = remoteID;
1092                                                 if (chl->pickHits(chs)){
1093                                                         best = chs.best[0];
1094                                                         LOG_DEBUG("find best hit this network if local IP");
1095                                                 }
1096                                         }
1097
1098                                         // find best hit on same network
1099                                         if (!best.host.ip)
1100                                         {
1101                                                 chs.init();
1102                                                 chs.matchHost = rhost;
1103                                                 chs.waitDelay = 2;
1104                                                 chs.excludeID = remoteID;
1105                                                 if (chl->pickHits(chs)){
1106                                                         best = chs.best[0];
1107                                                         LOG_DEBUG("find best hit on same network");
1108                                                 }
1109
1110                                         }
1111
1112                                         // find best hit on other networks
1113 /*                                      if (!best.host.ip)
1114                                         {
1115                                                 chs.init();
1116                                                 chs.waitDelay = 2;
1117                                                 chs.excludeID = remoteID;
1118                                                 if (chl->pickHits(chs)){
1119                                                         best = chs.best[0];
1120                                                         LOG_DEBUG("find best hit on other networks");
1121                                                 }
1122
1123                                         }*/
1124                                         
1125                                         if (!best.host.ip)
1126                                                 break;
1127
1128                                         best.writeAtoms(atom2,chanInfo.id);                        
1129                                         cnt++;
1130                                 }
1131
1132                                 if (!best.host.ip){
1133                                         char tmp[50];
1134 //                                      chanMgr->hitlistlock.on();
1135                                         int rhcnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
1136 //                                      chanMgr->hitlistlock.off();
1137                                         for (int i = 0; i < rhcnt; i++){
1138                                                 chs.best[i].writeAtoms(atom2, chanInfo.id);
1139                                                 chs.best[i].host.toStr(tmp);
1140                                                 LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
1141                                                 best.host.ip = chs.best[i].host.ip;
1142                                         }
1143                                         cnt += rhcnt;
1144                                 }
1145
1146                                 if (cnt)
1147                                 {
1148                                         LOG_DEBUG("Sent %d channel hit(s) to %s",cnt,ripStr);
1149
1150                                 }
1151                                 else if (rhost.port)
1152                                 {
1153                                         // find firewalled host
1154                                         chs.init();
1155                                         chs.waitDelay = 30;
1156                                         chs.useFirewalled = true;
1157                                         chs.excludeID = remoteID;
1158                                         if (chl->pickHits(chs))
1159                                         {
1160                                                 best = chs.best[0];
1161                                                 int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_RELAY);
1162                                                 LOG_DEBUG("Broadcasted channel push request to %d clients for %s",cnt,ripStr);
1163                                         }
1164                                 } 
1165
1166                                 // if all else fails, use tracker
1167                                 if (!best.host.ip)
1168                                 {
1169                                         // find best tracker on this network if local IP
1170                                         if (!rhost.globalIP())
1171                                         {
1172                                                 chs.init();
1173                                                 chs.matchHost = servMgr->serverHost;
1174                                                 chs.trackersOnly = true;
1175                                                 chs.excludeID = remoteID;
1176                                                 if (chl->pickHits(chs))
1177                                                         best = chs.best[0];
1178
1179                                         }
1180
1181                                         // find local tracker
1182                                         if (!best.host.ip)
1183                                         {
1184                                                 chs.init();
1185                                                 chs.matchHost = rhost;
1186                                                 chs.trackersOnly = true;
1187                                                 chs.excludeID = remoteID;
1188                                                 if (chl->pickHits(chs))
1189                                                         best = chs.best[0];
1190                                         }
1191
1192                                         // find global tracker
1193                                         if (!best.host.ip)
1194                                         {
1195                                                 chs.init();
1196                                                 chs.trackersOnly = true;
1197                                                 chs.excludeID = remoteID;
1198                                                 if (chl->pickHits(chs))
1199                                                         best = chs.best[0];
1200                                         }
1201
1202                                         if (best.host.ip)
1203                                         {
1204                                                 best.writeAtoms(atom2,chanInfo.id);                             
1205                                                 LOG_DEBUG("Sent 1 tracker hit to %s",ripStr);
1206                                         }else if (rhost.port)
1207                                         {
1208                                                 // find firewalled tracker
1209                                                 chs.init();
1210                                                 chs.useFirewalled = true;
1211                                                 chs.trackersOnly = true;
1212                                                 chs.excludeID = remoteID;
1213                                                 chs.waitDelay = 30;
1214                                                 if (chl->pickHits(chs))
1215                                                 {
1216                                                         best = chs.best[0];
1217                                                         int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_CIN);
1218                                                         LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,ripStr);
1219                                                 }
1220                                         }
1221
1222                                 }
1223
1224
1225                         }
1226
1227                         chanMgr->hitlistlock.off();
1228
1229                         // return not available yet code
1230                         atom2.writeInt(PCP_QUIT,error);
1231                         sock->write(tbuf, mem.getPosition());
1232                         result = false;
1233
1234                         /*
1235                         char c[512];
1236                         // wait disconnect from other host
1237                         try{
1238                                 while(sock->read(c, sizeof(c))){
1239                                         sys->sleep(10);
1240                                 }
1241                         }catch(StreamException &e){
1242                                 LOG_DEBUG("RelayInfoWait: %s",e.msg);
1243                         }
1244                         */
1245                 }else
1246                 {
1247                         LOG_DEBUG("Sending channel unavailable");
1248                         sock->writeLine(HTTP_SC_UNAVAILABLE);
1249                         sock->writeLine("");
1250                         result = false;
1251                 }
1252
1253         } else {
1254
1255                 if (chanInfo.contentType != ChanInfo::T_MP3)
1256                         addMetadata=false;
1257
1258                 if (addMetadata && (outputProtocol == ChanInfo::SP_HTTP))               // winamp mp3 metadata check
1259                 {
1260
1261                         sock->writeLine(ICY_OK);
1262
1263                         sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1264                         sock->writeLineF("icy-name:%s",chanInfo.name.cstr());
1265                         sock->writeLineF("icy-br:%d",chanInfo.bitrate);
1266                         sock->writeLineF("icy-genre:%s",chanInfo.genre.cstr());
1267                         sock->writeLineF("icy-url:%s",chanInfo.url.cstr());
1268                         sock->writeLineF("icy-metaint:%d",chanMgr->icyMetaInterval);
1269                         sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1270
1271                         sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1272
1273                 }else
1274                 {
1275
1276                         sock->writeLine(HTTP_SC_OK);
1277
1278                         if ((chanInfo.contentType != ChanInfo::T_ASX) && (chanInfo.contentType != ChanInfo::T_WMV) && (chanInfo.contentType != ChanInfo::T_WMA))
1279                         {
1280                                 sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1281
1282                                 sock->writeLine("Accept-Ranges: none");
1283
1284                                 sock->writeLineF("x-audiocast-name: %s",chanInfo.name.cstr());
1285                                 sock->writeLineF("x-audiocast-bitrate: %d",chanInfo.bitrate);
1286                                 sock->writeLineF("x-audiocast-genre: %s",chanInfo.genre.cstr());
1287                                 sock->writeLineF("x-audiocast-description: %s",chanInfo.desc.cstr());
1288                                 sock->writeLineF("x-audiocast-url: %s",chanInfo.url.cstr());
1289                                 sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1290                         }
1291
1292
1293                         if (outputProtocol == ChanInfo::SP_HTTP)
1294                         {
1295                                 switch (chanInfo.contentType)
1296                                 {
1297                                         case ChanInfo::T_OGG:
1298                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XOGG);
1299                                                 break;
1300                                         case ChanInfo::T_MP3:
1301                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1302                                                 break;
1303                                         case ChanInfo::T_MOV:
1304                                                 sock->writeLine("Connection: close");
1305                                                 sock->writeLine("Content-Length: 10000000");
1306                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MOV);
1307                                                 break;
1308                                         case ChanInfo::T_MPG:
1309                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MPG);
1310                                                 break;
1311                                         case ChanInfo::T_NSV:
1312                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_NSV);
1313                                                 break;
1314                                         case ChanInfo::T_ASX:
1315                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_ASX);
1316                                                 break;
1317                                         case ChanInfo::T_WMA:
1318                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMA);
1319                                                 break;
1320                                         case ChanInfo::T_WMV:
1321                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMV);
1322                                                 break;
1323                                 }
1324                         } else if (outputProtocol == ChanInfo::SP_MMS)
1325                         {
1326                                 sock->writeLine("Server: Rex/9.0.0.2980");
1327                                 sock->writeLine("Cache-Control: no-cache");
1328                                 sock->writeLine("Pragma: no-cache");
1329                                 sock->writeLine("Pragma: client-id=3587303426");
1330                                 sock->writeLine("Pragma: features=\"broadcast,playlist\"");
1331
1332                                 if (nsSwitchNum)
1333                                 {
1334                                         sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MMS);
1335                                 }else
1336                                 {
1337                                         if (agent.contains("Android"))
1338                                         {
1339                                                 LOG_DEBUG("INFO: Android client detected.");
1340                                                 sock->writeLineF("%s %s", HTTP_HS_CONTENT, MIME_WMV);
1341                                         } else
1342                                         {
1343                                                 sock->writeLine("Content-Type: application/vnd.ms.wms-hdr.asfv1");
1344                                                 if (ch)
1345                                                         sock->writeLineF("Content-Length: %d",ch->headPack.len);
1346                                                 sock->writeLine("Connection: Keep-Alive");
1347                                         }
1348                                 }
1349                         
1350                         } else if (outputProtocol == ChanInfo::SP_PCP)
1351                         {
1352                                 sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
1353                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1354
1355                         }else if (outputProtocol == ChanInfo::SP_PEERCAST)
1356                         {
1357                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST);
1358                         }
1359                 }
1360                 sock->writeLine("");
1361                 result = true;
1362
1363                 if (gotPCP)
1364                 {
1365                         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1366                         atom.writeInt(PCP_OK,0);
1367                         if (rhost.globalIP())
1368                                 serventHit.rhost[0] = rhost;
1369                         else
1370                                 serventHit.rhost[1] = rhost;
1371                         serventHit.sessionID = remoteID;
1372                         serventHit.numHops = 1;
1373                         chanMgr->addHit(serventHit);
1374                 }
1375
1376         }
1377
1378
1379
1380         return result;
1381 }
1382
1383 // -----------------------------------
1384 void Servent::handshakeGiv(GnuID &id)
1385 {
1386         if (id.isSet())
1387         {
1388                 char idstr[64];
1389                 id.toStr(idstr);
1390                 sock->writeLineF("GIV /%s",idstr);
1391         }else
1392                 sock->writeLine("GIV");
1393
1394         sock->writeLine("");
1395 }
1396
1397
1398 // -----------------------------------
1399 void Servent::processGnutella()
1400 {
1401         type = T_PGNU;
1402
1403         //if (servMgr->isRoot && !servMgr->needConnections())
1404         if (servMgr->isRoot)
1405         {
1406                 processRoot();
1407                 return;
1408         }
1409
1410
1411
1412         gnuStream.init(sock);
1413         setStatus(S_CONNECTED);
1414
1415         if (!servMgr->isRoot)
1416         {
1417                 chanMgr->broadcastRelays(this, 1, 1);
1418                 GnuPacket *p;
1419
1420                 if ((p=outPacketsNorm.curr()))  
1421                         gnuStream.sendPacket(*p);
1422                 return;
1423         }
1424
1425         gnuStream.ping(2);
1426
1427 //      if (type != T_LOOKUP)
1428 //              chanMgr->broadcastRelays(this,chanMgr->minBroadcastTTL,2);
1429
1430         lastPacket = lastPing = sys->getTime();
1431         bool doneBigPing=false;
1432
1433         const unsigned int      abortTimeoutSecs = 60;          // abort connection after 60 secs of no activitiy
1434         const unsigned int      packetTimeoutSecs = 30;         // ping connection after 30 secs of no activity
1435
1436         unsigned int currBytes=0;
1437         unsigned int lastWait=0;
1438
1439         unsigned int lastTotalIn=0,lastTotalOut=0;
1440
1441         while (thread.active && sock->active())
1442         {
1443
1444                 if (sock->readReady())
1445                 {
1446                         lastPacket = sys->getTime();
1447
1448                         if (gnuStream.readPacket(pack))
1449                         {
1450                                 char ipstr[64];
1451                                 sock->host.toStr(ipstr);
1452
1453                                 GnuID routeID;
1454                                 GnuStream::R_TYPE ret = GnuStream::R_PROCESS;
1455
1456                                 if (pack.func != GNU_FUNC_PONG)
1457                                         if (servMgr->seenPacket(pack))
1458                                                 ret = GnuStream::R_DUPLICATE;
1459
1460                                 seenIDs.add(pack.id);
1461
1462
1463                                 if (ret == GnuStream::R_PROCESS)
1464                                 {
1465                                         GnuID routeID;
1466                                         ret = gnuStream.processPacket(pack,this,routeID);
1467
1468                                         if (flowControl && (ret == GnuStream::R_BROADCAST))
1469                                                 ret = GnuStream::R_DROP;
1470
1471                                 }
1472
1473                                 switch(ret)
1474                                 {
1475                                         case GnuStream::R_BROADCAST:
1476                                                 if (servMgr->broadcast(pack,this))
1477                                                         stats.add(Stats::NUMBROADCASTED);
1478                                                 else
1479                                                         stats.add(Stats::NUMDROPPED);
1480                                                 break;
1481                                         case GnuStream::R_ROUTE:
1482                                                 if (servMgr->route(pack,routeID,NULL))
1483                                                         stats.add(Stats::NUMROUTED);
1484                                                 else
1485                                                         stats.add(Stats::NUMDROPPED);
1486                                                 break;
1487                                         case GnuStream::R_ACCEPTED:
1488                                                 stats.add(Stats::NUMACCEPTED);
1489                                                 break;
1490                                         case GnuStream::R_DUPLICATE:
1491                                                 stats.add(Stats::NUMDUP);
1492                                                 break;
1493                                         case GnuStream::R_DEAD:
1494                                                 stats.add(Stats::NUMDEAD);
1495                                                 break;
1496                                         case GnuStream::R_DISCARD:
1497                                                 stats.add(Stats::NUMDISCARDED);
1498                                                 break;
1499                                         case GnuStream::R_BADVERSION:
1500                                                 stats.add(Stats::NUMOLD);
1501                                                 break;
1502                                         case GnuStream::R_DROP:
1503                                                 stats.add(Stats::NUMDROPPED);
1504                                                 break;
1505                                 }
1506
1507
1508                                 LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,ipstr);
1509
1510
1511
1512                         }else{
1513                                 LOG_ERROR("Bad packet");
1514                         }
1515                 }
1516
1517
1518                 GnuPacket *p;
1519
1520                 if ((p=outPacketsPri.curr()))                           // priority packet
1521                 {
1522                         gnuStream.sendPacket(*p);
1523                         seenIDs.add(p->id);
1524                         outPacketsPri.next();
1525                 } else if ((p=outPacketsNorm.curr()))           // or.. normal packet
1526                 {
1527                         gnuStream.sendPacket(*p);
1528                         seenIDs.add(p->id);
1529                         outPacketsNorm.next();
1530                 }
1531
1532                 int lpt =  sys->getTime()-lastPacket;
1533
1534                 if (!doneBigPing)
1535                 {
1536                         if ((sys->getTime()-lastPing) > 15)
1537                         {
1538                                 gnuStream.ping(7);
1539                                 lastPing = sys->getTime();
1540                                 doneBigPing = true;
1541                         }
1542                 }else{
1543                         if (lpt > packetTimeoutSecs)
1544                         {
1545                                 
1546                                 if ((sys->getTime()-lastPing) > packetTimeoutSecs)
1547                                 {
1548                                         gnuStream.ping(1);
1549                                         lastPing = sys->getTime();
1550                                 }
1551
1552                         }
1553                 }
1554                 if (lpt > abortTimeoutSecs)
1555                         throw TimeoutException();
1556
1557
1558                 unsigned int totIn = sock->totalBytesIn-lastTotalIn;
1559                 unsigned int totOut = sock->totalBytesOut-lastTotalOut;
1560
1561                 unsigned int bytes = totIn+totOut;
1562
1563                 lastTotalIn = sock->totalBytesIn;
1564                 lastTotalOut = sock->totalBytesOut;
1565
1566                 const int serventBandwidth = 1000;
1567
1568                 int delay = sys->idleSleepTime;
1569                 if ((bytes) && (serventBandwidth >= 8))
1570                         delay = (bytes*1000)/(serventBandwidth/8);      // set delay relative packetsize
1571
1572                 if (delay < (int)sys->idleSleepTime)
1573                         delay = sys->idleSleepTime;
1574                 //LOG("delay %d, in %d, out %d",delay,totIn,totOut);
1575
1576                 sys->sleep(delay);
1577         }
1578
1579 }
1580
1581
1582 // -----------------------------------
1583 void Servent::processRoot()
1584 {
1585         try 
1586         {
1587         
1588                 gnuStream.init(sock);
1589                 setStatus(S_CONNECTED);
1590
1591                 gnuStream.ping(2);
1592
1593                 unsigned int lastConnect = sys->getTime();
1594
1595                 while (thread.active && sock->active())
1596                 {
1597                         if (gnuStream.readPacket(pack))
1598                         {
1599                                 char ipstr[64];
1600                                 sock->host.toStr(ipstr);
1601                                 
1602                                 LOG_NETWORK("packet in: %d from %s",pack.func,ipstr);
1603
1604
1605                                 if (pack.func == GNU_FUNC_PING)         // if ping then pong back some hosts and close
1606                                 {
1607                                         
1608                                         Host hl[32];
1609                                         int cnt = servMgr->getNewestServents(hl,32,sock->host); 
1610                                         if (cnt)
1611                                         {
1612                                                 int start = sys->rnd() % cnt;
1613                                                 int max = cnt>8?8:cnt;
1614
1615                                                 for(int i=0; i<max; i++)
1616                                                 {
1617                                                         GnuPacket pong;
1618                                                         pack.hops = 1;
1619                                                         pong.initPong(hl[start],false,pack);
1620                                                         gnuStream.sendPacket(pong);
1621
1622                                                         char ipstr[64];
1623                                                         hl[start].toStr(ipstr);
1624
1625                                                         //LOG_NETWORK("Pong %d: %s",start+1,ipstr);
1626                                                         start = (start+1) % cnt;
1627                                                 }
1628                                                 char str[64];
1629                                                 sock->host.toStr(str);
1630                                                 LOG_NETWORK("Sent %d pong(s) to %s",max,str);
1631                                         }else
1632                                         {
1633                                                 LOG_NETWORK("No Pongs to send");
1634                                                 //return;
1635                                         }
1636                                 }else if (pack.func == GNU_FUNC_PONG)           // pong?
1637                                 {
1638                                         MemoryStream pong(pack.data,pack.len);
1639
1640                                         int ip,port;
1641                                         port = pong.readShort();
1642                                         ip = pong.readLong();
1643                                         ip = SWAP4(ip);
1644
1645
1646                                         Host h(ip,port);
1647                                         if ((ip) && (port) && (h.globalIP()))
1648                                         {
1649
1650                                                 LOG_NETWORK("added pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port);
1651                                                 servMgr->addHost(h,ServHost::T_SERVENT,sys->getTime());
1652                                         }
1653                                         //return;
1654                                 } else if (pack.func == GNU_FUNC_HIT)
1655                                 {
1656                                         MemoryStream data(pack.data,pack.len);
1657                                         ChanHit hit;
1658                                         gnuStream.readHit(data,hit,pack.hops,pack.id);
1659                                 }
1660
1661                                 //if (gnuStream.packetsIn > 5)  // die if we get too many packets
1662                                 //      return;
1663                         }
1664
1665                         if((sys->getTime()-lastConnect > 60))
1666                                 break;
1667                 }
1668
1669
1670         }catch(StreamException &e)
1671         {
1672                 LOG_ERROR("Relay: %s",e.msg);
1673         }
1674
1675         
1676 }       
1677
1678 // -----------------------------------
1679 int Servent::givProcMain(ThreadInfo *thread)
1680 {
1681 //      thread->lock();
1682         Servent *sv = (Servent*)thread->data;
1683         try 
1684         {
1685                 sv->handshakeGiv(sv->givID);
1686                 sv->handshakeIncoming();
1687
1688         }catch(StreamException &e)
1689         {
1690                 LOG_ERROR("GIV: %s",e.msg);
1691         }
1692
1693         sv->kill();
1694         sys->endThread(thread);
1695         return 0;
1696 }
1697
1698 // -----------------------------------
1699 int Servent::givProc(ThreadInfo *thread)
1700 {
1701         SEH_THREAD(givProcMain, Servent::givProc);
1702 }
1703
1704 // -----------------------------------
1705 void Servent::handshakeOutgoingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent, bool isTrusted)
1706 {
1707
1708         bool nonFW = (servMgr->getFirewall() != ServMgr::FW_ON);
1709         bool testFW = (servMgr->getFirewall() == ServMgr::FW_UNKNOWN);
1710
1711         bool sendBCID = isTrusted && chanMgr->isBroadcasting();
1712
1713         char tbuf[1024];
1714         MemoryStream mem(tbuf, sizeof(tbuf));
1715         AtomStream atom2(mem);
1716         atom2.writeParent(PCP_HELO,3 + (testFW?1:0) + (nonFW?1:0) + (sendBCID?1:0));
1717                 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1718                 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1719                 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1720                 if (nonFW)
1721                         atom2.writeShort(PCP_HELO_PORT,servMgr->serverHost.port);
1722                 if (testFW)
1723                         atom2.writeShort(PCP_HELO_PING,servMgr->serverHost.port);
1724                 if (sendBCID)
1725                         atom2.writeBytes(PCP_HELO_BCID,chanMgr->broadcastID.id,16);
1726         atom.io.write(tbuf, mem.getPosition());
1727
1728
1729         LOG_DEBUG("PCP outgoing waiting for OLEH..");
1730
1731         int numc,numd;
1732         ID4 id = atom.read(numc,numd);
1733         if (id != PCP_OLEH)
1734         {
1735                 LOG_DEBUG("PCP outgoing reply: %s",id.getString().str());
1736                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1737                 throw StreamException("Got unexpected PCP response");
1738         }
1739
1740
1741
1742         char arg[64];
1743
1744         GnuID clientID;
1745         clientID.clear();
1746         rid.clear();
1747         int version=0;
1748         int disable=0;
1749
1750         Host thisHost;
1751
1752         // read OLEH response
1753         for(int i=0; i<numc; i++)
1754         {
1755                 int c,dlen;
1756                 ID4 id = atom.read(c,dlen);
1757
1758                 if (id == PCP_HELO_AGENT)
1759                 {
1760                         atom.readString(arg,sizeof(arg),dlen);
1761                         agent.set(arg);
1762
1763                 }else if (id == PCP_HELO_REMOTEIP)
1764                 {
1765                         thisHost.ip = atom.readInt();
1766
1767                 }else if (id == PCP_HELO_PORT)
1768                 {
1769                         thisHost.port = atom.readShort();
1770
1771                 }else if (id == PCP_HELO_VERSION)
1772                 {
1773                         version = atom.readInt();
1774
1775                 }else if (id == PCP_HELO_DISABLE)
1776                 {
1777                         disable = atom.readInt();
1778
1779                 }else if (id == PCP_HELO_SESSIONID)
1780                 {
1781                         atom.readBytes(rid.id,16);
1782                         if (rid.isSame(servMgr->sessionID))
1783                                 throw StreamException("Servent loopback");
1784
1785                 }else
1786                 {
1787                         LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1788                         atom.skip(c,dlen);
1789                 }
1790
1791     }
1792
1793
1794         // update server ip/firewall status
1795         if (isTrusted)
1796         {
1797                 if (thisHost.isValid())
1798                 {
1799                         if ((servMgr->serverHost.ip != thisHost.ip) && (servMgr->forceIP.isEmpty()))
1800                         {
1801                                 char ipstr[64];
1802                                 thisHost.toStr(ipstr);
1803                                 LOG_DEBUG("Got new ip: %s",ipstr);
1804                                 servMgr->serverHost.ip = thisHost.ip;
1805                         }
1806
1807                         if (servMgr->getFirewall() == ServMgr::FW_UNKNOWN)
1808                         {
1809                                 if (thisHost.port && thisHost.globalIP())
1810                                         servMgr->setFirewall(ServMgr::FW_OFF);
1811                                 else
1812                                         servMgr->setFirewall(ServMgr::FW_ON);
1813                         }
1814                 }
1815
1816                 if (disable == 1)
1817                 {
1818                         LOG_ERROR("client disabled: %d",disable);
1819                         servMgr->isDisabled = true;             
1820                 }else
1821                 {
1822                         servMgr->isDisabled = false;            
1823                 }
1824         }
1825
1826
1827
1828         if (!rid.isSet())
1829         {
1830                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1831                 throw StreamException("Remote host not identified");
1832         }
1833
1834         LOG_DEBUG("PCP Outgoing handshake complete.");
1835
1836 }
1837
1838 // -----------------------------------
1839 void Servent::handshakeIncomingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
1840 {
1841         int numc,numd;
1842         ID4 id = atom.read(numc,numd);
1843
1844
1845         if (id != PCP_HELO)
1846         {
1847                 LOG_DEBUG("PCP incoming reply: %s",id.getString().str());
1848                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1849                 throw StreamException("Got unexpected PCP response");
1850         }
1851
1852         char arg[64];
1853
1854         ID4 osType;
1855
1856         int version=0;
1857
1858         int pingPort=0;
1859
1860         GnuID bcID;
1861         GnuID clientID;
1862
1863         bcID.clear();
1864         clientID.clear();
1865
1866         rhost.port = 0;
1867
1868         for(int i=0; i<numc; i++)
1869         {
1870
1871                 int c,dlen;
1872                 ID4 id = atom.read(c,dlen);
1873
1874                 if (id == PCP_HELO_AGENT)
1875                 {
1876                         atom.readString(arg,sizeof(arg),dlen);
1877                         agent.set(arg);
1878
1879                 }else if (id == PCP_HELO_VERSION)
1880                 {
1881                         version = atom.readInt();
1882
1883                 }else if (id == PCP_HELO_SESSIONID)
1884                 {
1885                         atom.readBytes(rid.id,16);
1886                         if (rid.isSame(servMgr->sessionID))
1887                                 throw StreamException("Servent loopback");
1888
1889                 }else if (id == PCP_HELO_BCID)
1890                 {
1891                         atom.readBytes(bcID.id,16);
1892
1893                 }else if (id == PCP_HELO_OSTYPE)
1894                 {
1895                         osType = atom.readInt();
1896                 }else if (id == PCP_HELO_PORT)
1897                 {
1898                         rhost.port = atom.readShort();
1899                 }else if (id == PCP_HELO_PING)
1900                 {
1901                         pingPort = atom.readShort();
1902                 }else
1903                 {
1904                         LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1905                         atom.skip(c,dlen);
1906                 }
1907
1908     }
1909
1910         if (version)
1911                 LOG_DEBUG("Incoming PCP is %s : v%d", agent.cstr(),version);
1912
1913
1914         if (!rhost.globalIP() && servMgr->serverHost.globalIP())
1915                 rhost.ip = servMgr->serverHost.ip;
1916
1917         if (pingPort)
1918         {
1919                 char ripStr[64];
1920                 rhost.toStr(ripStr);
1921                 LOG_DEBUG("Incoming firewalled test request: %s ", ripStr);
1922                 rhost.port = pingPort;
1923                 if (!rhost.globalIP() || !pingHost(rhost,rid))
1924                         rhost.port = 0;
1925         }
1926
1927         if (servMgr->isRoot)
1928         {
1929                 if (bcID.isSet())
1930                 {
1931                         if (bcID.getFlags() & 1)        // private
1932                         {
1933                                 BCID *bcid = servMgr->findValidBCID(bcID);
1934                                 if (!bcid || (bcid && !bcid->valid))
1935                                 {
1936                                         atom.writeParent(PCP_OLEH,1);
1937                                         atom.writeInt(PCP_HELO_DISABLE,1);
1938                                         throw StreamException("Client is banned");
1939                                 }
1940                         }
1941                 }
1942         }
1943
1944
1945         char tbuf[1024];
1946         MemoryStream mem(tbuf, sizeof(tbuf));
1947         AtomStream atom2(mem);
1948         atom2.writeParent(PCP_OLEH,5);
1949                 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1950                 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1951                 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1952                 atom2.writeInt(PCP_HELO_REMOTEIP,rhost.ip);
1953                 atom2.writeShort(PCP_HELO_PORT,rhost.port);
1954
1955         if (version)
1956         {
1957                 if (version < PCP_CLIENT_MINVERSION)
1958                 {
1959                         atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADAGENT);
1960                         atom.io.write(tbuf, mem.getPosition());
1961                         throw StreamException("Agent is not valid");
1962                 }
1963         }
1964
1965         if (!rid.isSet())
1966         {
1967                 atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1968                 atom.io.write(tbuf, mem.getPosition());
1969                 throw StreamException("Remote host not identified");
1970         }
1971
1972
1973
1974         if (servMgr->isRoot)
1975         {
1976                 servMgr->writeRootAtoms(atom2,false);
1977         }
1978
1979         atom.io.write(tbuf, mem.getPosition());
1980
1981         LOG_DEBUG("PCP Incoming handshake complete.");
1982
1983 }
1984
1985 // -----------------------------------
1986 void Servent::processIncomingPCP(bool suggestOthers)
1987 {
1988         PCPStream::readVersion(*sock);
1989
1990
1991         AtomStream atom(*sock);
1992         Host rhost = sock->host;
1993
1994         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1995
1996
1997         bool alreadyConnected = (servMgr->findConnection(Servent::T_COUT,remoteID)!=NULL)
1998                                                         || (servMgr->findConnection(Servent::T_CIN,remoteID)!=NULL);
1999         bool unavailable = servMgr->controlInFull();
2000         bool offair = !servMgr->isRoot && !chanMgr->isBroadcasting();
2001
2002         char rstr[64];
2003         rhost.toStr(rstr);
2004
2005         if (unavailable || alreadyConnected || offair)
2006         {
2007                 int error;
2008
2009                 if (alreadyConnected)
2010                         error = PCP_ERROR_QUIT+PCP_ERROR_ALREADYCONNECTED;
2011                 else if (unavailable)
2012                         error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
2013                 else if (offair)
2014                         error = PCP_ERROR_QUIT+PCP_ERROR_OFFAIR;
2015                 else 
2016                         error = PCP_ERROR_QUIT;
2017
2018
2019                 if (suggestOthers)
2020                 {
2021
2022                         ChanHit best;
2023                         ChanHitSearch chs;
2024
2025                         int cnt=0;
2026                         for(int i=0; i<8; i++)
2027                         {
2028                                 best.init();
2029
2030                                 // find best hit on this network                        
2031                                 if (!rhost.globalIP())
2032                                 {
2033                                         chs.init();
2034                                         chs.matchHost = servMgr->serverHost;
2035                                         chs.waitDelay = 2;
2036                                         chs.excludeID = remoteID;
2037                                         chs.trackersOnly = true;
2038                                         chs.useBusyControls = false;
2039                                         if (chanMgr->pickHits(chs))
2040                                                 best = chs.best[0];
2041                                 }
2042
2043                                 // find best hit on same network                        
2044                                 if (!best.host.ip)
2045                                 {
2046                                         chs.init();
2047                                         chs.matchHost = rhost;
2048                                         chs.waitDelay = 2;
2049                                         chs.excludeID = remoteID;
2050                                         chs.trackersOnly = true;
2051                                         chs.useBusyControls = false;
2052                                         if (chanMgr->pickHits(chs))
2053                                                 best = chs.best[0];
2054                                 }
2055
2056                                 // else find best hit on other networks
2057                                 if (!best.host.ip)
2058                                 {
2059                                         chs.init();
2060                                         chs.waitDelay = 2;
2061                                         chs.excludeID = remoteID;
2062                                         chs.trackersOnly = true;
2063                                         chs.useBusyControls = false;
2064                                         if (chanMgr->pickHits(chs))
2065                                                 best = chs.best[0];
2066                                 }
2067
2068                                 if (!best.host.ip)
2069                                         break;
2070                                 
2071                                 GnuID noID;
2072                                 noID.clear();
2073                                 best.writeAtoms(atom,noID);
2074                                 cnt++;
2075                         }
2076                         if (cnt)
2077                         {
2078                                 LOG_DEBUG("Sent %d tracker(s) to %s",cnt,rstr);
2079                         }
2080                         else if (rhost.port)
2081                         {
2082                                 // send push request to best firewalled tracker on other network
2083                                 chs.init();
2084                                 chs.waitDelay = 30;
2085                                 chs.excludeID = remoteID;
2086                                 chs.trackersOnly = true;
2087                                 chs.useFirewalled = true;
2088                                 chs.useBusyControls = false;
2089                                 if (chanMgr->pickHits(chs))
2090                                 {
2091                                         best = chs.best[0];
2092                                         GnuID noID;
2093                                         noID.clear();
2094                                         int cnt = servMgr->broadcastPushRequest(best,rhost,noID,Servent::T_CIN);
2095                                         LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,rstr);
2096                                 }
2097                         }else
2098                         {
2099                                 LOG_DEBUG("No available trackers");
2100                         }
2101                 }
2102
2103
2104                 LOG_ERROR("Sending QUIT to incoming: %d",error);
2105
2106                 atom.writeInt(PCP_QUIT,error);
2107                 return;         
2108         }
2109         
2110
2111         type = T_CIN;
2112         setStatus(S_CONNECTED);
2113
2114         atom.writeInt(PCP_OK,0);
2115
2116         // ask for update
2117         atom.writeParent(PCP_ROOT,1);
2118                 atom.writeParent(PCP_ROOT_UPDATE,0);
2119
2120         pcpStream = new PCPStream(remoteID);
2121
2122         int error = 0;
2123         BroadcastState bcs;
2124         while (!error && thread.active && !sock->eof())
2125         {
2126                 error = pcpStream->readPacket(*sock,bcs);
2127                 sys->sleepIdle();
2128
2129                 if (!servMgr->isRoot && !chanMgr->isBroadcasting())
2130                         error = PCP_ERROR_OFFAIR;
2131                 if (peercastInst->isQuitting)
2132                         error = PCP_ERROR_SHUTDOWN;
2133         }
2134
2135         pcpStream->flush(*sock);
2136
2137         error += PCP_ERROR_QUIT;
2138         atom.writeInt(PCP_QUIT,error);
2139
2140         LOG_DEBUG("PCP Incoming to %s closed: %d",rstr,error);
2141
2142 }
2143
2144 // -----------------------------------
2145 int Servent::outgoingProcMain(ThreadInfo *thread)
2146 {
2147 //      thread->lock();
2148         LOG_DEBUG("COUT started");
2149
2150         Servent *sv = (Servent*)thread->data;
2151                 
2152         GnuID noID;
2153         noID.clear();
2154         sv->pcpStream = new PCPStream(noID);
2155
2156         while (sv->thread.active)
2157         {
2158                 sv->setStatus(S_WAIT);
2159
2160                 if (chanMgr->isBroadcasting() && servMgr->autoServe)
2161                 {
2162                         ChanHit bestHit;
2163                         ChanHitSearch chs;
2164                         char ipStr[64];
2165
2166                         do
2167                         {
2168                                 bestHit.init();
2169
2170                                 if (servMgr->rootHost.isEmpty())
2171                                         break;
2172
2173                                 if (sv->pushSock)
2174                                 {
2175                                         sv->sock = sv->pushSock;
2176                                         sv->pushSock = NULL;
2177                                         bestHit.host = sv->sock->host;
2178                                         break;
2179                                 }
2180
2181                                 GnuID noID;
2182                                 noID.clear();
2183                                 ChanHitList *chl = chanMgr->findHitListByID(noID);
2184                                 if (chl)
2185                                 {
2186                                         // find local tracker
2187                                         chs.init();
2188                                         chs.matchHost = servMgr->serverHost;
2189                                         chs.waitDelay = MIN_TRACKER_RETRY;
2190                                         chs.excludeID = servMgr->sessionID;
2191                                         chs.trackersOnly = true;
2192                                         if (!chl->pickHits(chs))
2193                                         {
2194                                                 // else find global tracker
2195                                                 chs.init();
2196                                                 chs.waitDelay = MIN_TRACKER_RETRY;
2197                                                 chs.excludeID = servMgr->sessionID;
2198                                                 chs.trackersOnly = true;
2199                                                 chl->pickHits(chs);
2200                                         }
2201
2202                                         if (chs.numResults)
2203                                         {
2204                                                 bestHit = chs.best[0];
2205                                         }
2206                                 }
2207
2208
2209                                 unsigned int ctime = sys->getTime();
2210
2211                                 if ((!bestHit.host.ip) && ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY))
2212                                 {
2213                                         bestHit.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
2214                                         bestHit.yp = true;
2215                                         chanMgr->lastYPConnect = ctime;
2216                                 }
2217                                 sys->sleepIdle();
2218
2219                         }while (!bestHit.host.ip && (sv->thread.active));
2220
2221
2222                         if (!bestHit.host.ip)           // give up
2223                         {
2224                                 LOG_ERROR("COUT giving up");
2225                                 break;
2226                         }
2227
2228
2229                         bestHit.host.toStr(ipStr);
2230
2231                         int error=0;
2232                         try 
2233                         {
2234
2235                                 LOG_DEBUG("COUT to %s: Connecting..",ipStr);
2236
2237                                 if (!sv->sock)
2238                                 {
2239                                         sv->setStatus(S_CONNECTING);
2240                                         sv->sock = sys->createSocket();
2241                                         if (!sv->sock)
2242                                                 throw StreamException("Unable to create socket");
2243                                         sv->sock->open(bestHit.host);
2244                                         sv->sock->connect();
2245
2246                                 }
2247
2248                                 sv->sock->setReadTimeout(30000);
2249                                 AtomStream atom(*sv->sock);
2250
2251                                 sv->setStatus(S_HANDSHAKE);
2252
2253                                 Host rhost = sv->sock->host;
2254                                 atom.writeInt(PCP_CONNECT,1);
2255                                 handshakeOutgoingPCP(atom,rhost,sv->remoteID,sv->agent,bestHit.yp);
2256
2257                                 sv->setStatus(S_CONNECTED);
2258
2259                                 LOG_DEBUG("COUT to %s: OK",ipStr);
2260
2261                                 sv->pcpStream->init(sv->remoteID);
2262
2263                                 BroadcastState bcs;
2264                                 bcs.servent_id = sv->servent_id;
2265                                 error = 0;
2266                                 while (!error && sv->thread.active && !sv->sock->eof() && servMgr->autoServe)
2267                                 {
2268                                         error = sv->pcpStream->readPacket(*sv->sock,bcs);
2269
2270                                         sys->sleepIdle();
2271
2272                                         if (!chanMgr->isBroadcasting())
2273                                                 error = PCP_ERROR_OFFAIR;
2274                                         if (peercastInst->isQuitting)
2275                                                 error = PCP_ERROR_SHUTDOWN;
2276
2277                                         if (sv->pcpStream->nextRootPacket)
2278                                                 if (sys->getTime() > (sv->pcpStream->nextRootPacket+30))
2279                                                         error = PCP_ERROR_NOROOT;
2280                                 }
2281                                 sv->setStatus(S_CLOSING);
2282
2283                                 sv->pcpStream->flush(*sv->sock);
2284
2285                                 error += PCP_ERROR_QUIT;
2286                                 atom.writeInt(PCP_QUIT,error);
2287
2288                                 LOG_ERROR("COUT to %s closed: %d",ipStr,error);
2289
2290                         }catch(TimeoutException &e)
2291                         {
2292                                 LOG_ERROR("COUT to %s: timeout (%s)",ipStr,e.msg);
2293                                 sv->setStatus(S_TIMEOUT);
2294                         }catch(StreamException &e)
2295                         {
2296                                 LOG_ERROR("COUT to %s: %s",ipStr,e.msg);
2297                                 sv->setStatus(S_ERROR);
2298                         }
2299
2300                         try
2301                         {
2302                                 if (sv->sock)
2303                                 {
2304                                         sv->sock->close();
2305                                         delete sv->sock;
2306                                         sv->sock = NULL;
2307                                 }
2308
2309                         }catch(StreamException &) {}
2310
2311                         // don`t discard this hit if we caused the disconnect (stopped broadcasting)
2312                         if (error != (PCP_ERROR_QUIT+PCP_ERROR_OFFAIR))
2313                                 chanMgr->deadHit(bestHit);
2314
2315                 }
2316
2317                 sys->sleepIdle();
2318         }
2319
2320         sv->kill();
2321         sys->endThread(thread);
2322         LOG_DEBUG("COUT ended");
2323         return 0;
2324 }
2325 // -----------------------------------
2326 int Servent::outgoingProc(ThreadInfo *thread)
2327 {
2328         SEH_THREAD(outgoingProcMain, Servent::outgoingProc);
2329 }
2330 // -----------------------------------
2331 int Servent::incomingProcMain(ThreadInfo *thread)
2332 {
2333 //      thread->lock();
2334
2335         Servent *sv = (Servent*)thread->data;
2336         
2337         char ipStr[64];
2338         sv->sock->host.toStr(ipStr);
2339
2340         try 
2341         {
2342                 sv->handshakeIncoming();
2343         }catch(HTTPException &e)
2344         {
2345                 try
2346                 {
2347                         sv->sock->writeLine(e.msg);
2348                         if (e.code == 401)
2349                                 sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\"");
2350                         sv->sock->writeLine("");
2351                 }catch(StreamException &){}
2352                 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2353         }catch(StreamException &e)
2354         {
2355                 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2356         }
2357
2358
2359         sv->kill();
2360         sys->endThread(thread);
2361         return 0;
2362 }
2363 // -----------------------------------
2364 int Servent::incomingProc(ThreadInfo *thread)
2365 {
2366         SEH_THREAD(incomingProcMain, Servent::incomingProc);
2367 }
2368 // -----------------------------------
2369 void Servent::processServent()
2370 {
2371         setStatus(S_HANDSHAKE);
2372
2373         handshakeIn();
2374
2375         if (!sock)
2376                 throw StreamException("Servent has no socket");
2377
2378         processGnutella();
2379 }
2380
2381 // -----------------------------------
2382 void Servent::processStream(bool doneHandshake,ChanInfo &chanInfo)
2383 {       
2384         if (!doneHandshake)
2385         {
2386                 setStatus(S_HANDSHAKE);
2387
2388                 if (!handshakeStream(chanInfo))
2389                         return;
2390         }
2391
2392         if (chanInfo.id.isSet())
2393         {
2394
2395                 chanID = chanInfo.id;
2396
2397                 LOG_CHANNEL("Sending channel: %s ",ChanInfo::getProtocolStr(outputProtocol));
2398
2399                 if (!waitForChannelHeader(chanInfo))
2400                         throw StreamException("Channel not ready");
2401
2402                 servMgr->totalStreams++;
2403
2404                 Host host = sock->host;
2405                 host.port = 0;  // force to 0 so we ignore the incoming port
2406
2407                 Channel *ch = chanMgr->findChannelByID(chanID);
2408                 if (!ch)
2409                         throw StreamException("Channel not found");
2410
2411                 if (outputProtocol == ChanInfo::SP_HTTP)
2412                 {
2413                         if ((addMetadata) && (chanMgr->icyMetaInterval))
2414                                 sendRawMetaChannel(chanMgr->icyMetaInterval);
2415                         else 
2416                                 sendRawChannel(true,true);
2417
2418                 }else if (outputProtocol == ChanInfo::SP_MMS)
2419                 {
2420                         if (nsSwitchNum)
2421                         {
2422                                 sendRawChannel(true,true);
2423                         }else
2424                         {
2425                                 sendRawChannel(true,false);
2426                         }
2427
2428                 }else if (outputProtocol  == ChanInfo::SP_PCP)
2429                 {
2430                         sendPCPChannel();
2431
2432                 } else if (outputProtocol  == ChanInfo::SP_PEERCAST)
2433                 {
2434                         sendPeercastChannel();
2435                 }
2436         }
2437
2438         setStatus(S_CLOSING);
2439 }
2440
2441 // -----------------------------------------
2442 #if 0
2443 // debug
2444                 FileStream file;
2445                 file.openReadOnly("c://test.mp3");
2446
2447                 LOG_DEBUG("raw file read");
2448                 char buf[4000];
2449                 int cnt=0;
2450                 while (!file.eof())
2451                 {
2452                         LOG_DEBUG("send %d",cnt++);
2453                         file.read(buf,sizeof(buf));
2454                         sock->write(buf,sizeof(buf));
2455
2456                 }
2457                 file.close();
2458                 LOG_DEBUG("raw file sent");
2459
2460         return;
2461 // debug
2462 #endif
2463 // -----------------------------------
2464 bool Servent::waitForChannelHeader(ChanInfo &info)
2465 {
2466         for(int i=0; i<30*10; i++)
2467         {
2468                 Channel *ch = chanMgr->findChannelByID(info.id);
2469                 if (!ch)
2470                         return false;
2471
2472                 if (ch->isPlaying() && (ch->rawData.writePos>0))
2473                         return true;
2474
2475                 if (!thread.active || !sock->active())
2476                         break;
2477                 sys->sleep(100);
2478         }
2479         return false;
2480 }
2481 // -----------------------------------
2482 void Servent::sendRawChannel(bool sendHead, bool sendData)
2483 {
2484         try
2485         {
2486
2487                 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2488
2489                 Channel *ch = chanMgr->findChannelByID(chanID);
2490                 if (!ch)
2491                         throw StreamException("Channel not found");
2492
2493                 setStatus(S_CONNECTED);
2494
2495                 LOG_DEBUG("Starting Raw stream of %s at %d",ch->info.name.cstr(),streamPos);
2496
2497                 if (sendHead)
2498                 {
2499                         ch->headPack.writeRaw(*sock);
2500                         streamPos = ch->headPack.pos + ch->headPack.len;
2501                         LOG_DEBUG("Sent %d bytes header ",ch->headPack.len);
2502                 }
2503
2504                 if (sendData)
2505                 {
2506
2507                         unsigned int streamIndex = ch->streamIndex;
2508                         unsigned int connectTime = sys->getTime();
2509                         unsigned int lastWriteTime = connectTime;
2510
2511                         while ((thread.active) && sock->active())
2512                         {
2513                                 ch = chanMgr->findChannelByID(chanID);
2514
2515                                 if (ch)
2516                                 {
2517
2518                                         if (streamIndex != ch->streamIndex)
2519                                         {
2520                                                 streamIndex = ch->streamIndex;
2521                                                 streamPos = ch->headPack.pos;
2522                                                 LOG_DEBUG("sendRaw got new stream index");
2523                                         }
2524
2525                                         ChanPacket rawPack;
2526                                         if (ch->rawData.findPacket(streamPos,rawPack))
2527                                         {
2528                                                 if (syncPos != rawPack.sync)
2529                                                         LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2530                                                 syncPos = rawPack.sync+1;
2531
2532                                                 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2533                                                 {
2534                                                         rawPack.writeRaw(*sock);
2535                                                         lastWriteTime = sys->getTime();
2536                                                 }
2537
2538                                                 if (rawPack.pos < streamPos)
2539                                                         LOG_DEBUG("raw: skip back %d",rawPack.pos-streamPos);
2540                                                 streamPos = rawPack.pos+rawPack.len;
2541                                         } else if (sock->readReady()) {
2542                                                 char c;
2543                                                 int error = sock->readUpto(&c, 1);
2544                                                 if (error == 0) sock->close();
2545                                         }
2546                                 }
2547
2548                                 if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2549                                         throw TimeoutException();
2550
2551                                 sys->sleepIdle();
2552                         }
2553                 }
2554         }catch(StreamException &e)
2555         {
2556                 LOG_ERROR("Stream channel: %s",e.msg);
2557         }
2558 }
2559
2560 #if 0
2561 // -----------------------------------
2562 void Servent::sendRawMultiChannel(bool sendHead, bool sendData)
2563 {
2564         try
2565         {
2566                 unsigned int chanStreamIndex[ChanMgr::MAX_CHANNELS];
2567                 unsigned int chanStreamPos[ChanMgr::MAX_CHANNELS];
2568                 GnuID chanIDs[ChanMgr::MAX_CHANNELS];
2569                 int numChanIDs=0;
2570                 for(int i=0; i<ChanMgr::MAX_CHANNELS; i++)
2571                 {
2572                         Channel *ch = &chanMgr->channels[i];
2573                         if (ch->isPlaying())
2574                                 chanIDs[numChanIDs++]=ch->info.id;
2575                 }
2576
2577
2578
2579                 setStatus(S_CONNECTED);
2580
2581
2582                 if (sendHead)
2583                 {
2584                         for(int i=0; i<numChanIDs; i++)
2585                         {
2586                                 Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2587                                 if (ch)
2588                                 {
2589                                         LOG_DEBUG("Starting RawMulti stream: %s",ch->info.name.cstr());
2590                                         ch->headPack.writeRaw(*sock);
2591                                         chanStreamPos[i] = ch->headPack.pos + ch->headPack.len;
2592                                         chanStreamIndex[i] = ch->streamIndex;
2593                                         LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2594
2595                                 }
2596                         }
2597                 }
2598
2599                 if (sendData)
2600                 {
2601
2602                         unsigned int connectTime=sys->getTime();
2603
2604                         while ((thread.active) && sock->active())
2605                         {
2606
2607                                 for(int i=1; i<numChanIDs; i++)
2608                                 {
2609                                         Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2610                                         if (ch)
2611                                         {
2612                                                 if (chanStreamIndex[i] != ch->streamIndex)
2613                                                 {
2614                                                         chanStreamIndex[i] = ch->streamIndex;
2615                                                         chanStreamPos[i] = ch->headPack.pos;
2616                                                         LOG_DEBUG("sendRawMulti got new stream index for chan %d",i);
2617                                                 }
2618
2619                                                 ChanPacket rawPack;
2620                                                 if (ch->rawData.findPacket(chanStreamPos[i],rawPack))
2621                                                 {
2622                                                         if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2623                                                                 rawPack.writeRaw(*sock);
2624
2625
2626                                                         if (rawPack.pos < chanStreamPos[i])
2627                                                                 LOG_DEBUG("raw: skip back %d",rawPack.pos-chanStreamPos[i]);
2628                                                         chanStreamPos[i] = rawPack.pos+rawPack.len;
2629
2630
2631                                                         //LOG("raw at %d: %d %d",streamPos,ch->rawData.getStreamPos(ch->rawData.firstPos),ch->rawData.getStreamPos(ch->rawData.lastPos));
2632                                                 }                                               
2633                                         }
2634                                         break;
2635                                 }
2636                                 
2637
2638                                 sys->sleepIdle();
2639                         }
2640                 }
2641         }catch(StreamException &e)
2642         {
2643                 LOG_ERROR("Stream channel: %s",e.msg);
2644         }
2645 }
2646 #endif
2647
2648 // -----------------------------------
2649 void Servent::sendRawMetaChannel(int interval)
2650 {
2651
2652         try
2653         {
2654                 Channel *ch = chanMgr->findChannelByID(chanID);
2655                 if (!ch)
2656                         throw StreamException("Channel not found");
2657
2658                 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2659
2660                 setStatus(S_CONNECTED);
2661
2662                 LOG_DEBUG("Starting Raw Meta stream of %s (metaint: %d) at %d",ch->info.name.cstr(),interval,streamPos);
2663
2664
2665                 String lastTitle,lastURL;
2666
2667                 int             lastMsgTime=sys->getTime();
2668                 bool    showMsg=true;
2669
2670                 char buf[16384];
2671                 int bufPos=0;
2672
2673                 if ((interval > sizeof(buf)) || (interval < 1))
2674                         throw StreamException("Bad ICY Meta Interval value");
2675
2676                 unsigned int connectTime = sys->getTime();
2677                 unsigned int lastWriteTime = connectTime;
2678
2679                 streamPos = 0;          // raw meta channel has no header (its MP3)
2680
2681                 while ((thread.active) && sock->active())
2682                 {
2683                         ch = chanMgr->findChannelByID(chanID);
2684
2685                         if (ch)
2686                         {
2687
2688                                 ChanPacket rawPack;
2689                                 if (ch->rawData.findPacket(streamPos,rawPack))
2690                                 {
2691
2692                                         if (syncPos != rawPack.sync)
2693                                                 LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2694                                         syncPos = rawPack.sync+1;
2695
2696                                         MemoryStream mem(rawPack.data,rawPack.len);
2697
2698                                         if (rawPack.type == ChanPacket::T_DATA)
2699                                         {
2700
2701                                                 int len = rawPack.len;
2702                                                 char *p = rawPack.data;
2703                                                 while (len)
2704                                                 {
2705                                                         int rl = len;
2706                                                         if ((bufPos+rl) > interval)
2707                                                                 rl = interval-bufPos;
2708                                                         memcpy(&buf[bufPos],p,rl);
2709                                                         bufPos+=rl;
2710                                                         p+=rl;
2711                                                         len-=rl;
2712
2713                                                         if (bufPos >= interval)
2714                                                         {
2715                                                                 bufPos = 0;     
2716                                                                 sock->write(buf,interval);
2717                                                                 lastWriteTime = sys->getTime();
2718
2719                                                                 if (chanMgr->broadcastMsgInterval)
2720                                                                         if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval)
2721                                                                         {
2722                                                                                 showMsg ^= true;
2723                                                                                 lastMsgTime = sys->getTime();
2724                                                                         }
2725
2726                                                                 String *metaTitle = &ch->info.track.title;
2727                                                                 if (!ch->info.comment.isEmpty() && (showMsg))
2728                                                                         metaTitle = &ch->info.comment;
2729
2730
2731                                                                 if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL))
2732                                                                 {
2733
2734                                                                         char tmp[1024];
2735                                                                         String title,url;
2736
2737                                                                         title = *metaTitle;
2738                                                                         url = ch->info.url;
2739
2740                                                                         title.convertTo(String::T_META);
2741                                                                         url.convertTo(String::T_META);
2742
2743                                                                         sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr());
2744                                                                         int len = ((strlen(tmp) + 15+1) / 16);
2745                                                                         sock->writeChar(len);
2746                                                                         sock->write(tmp,len*16);
2747
2748                                                                         lastTitle = *metaTitle;
2749                                                                         lastURL = ch->info.url;
2750
2751                                                                         LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr());
2752
2753                                                                 }else
2754                                                                 {
2755                                                                         sock->writeChar(0);                                     
2756                                                                 }
2757
2758                                                         }
2759                                                 }
2760                                         }
2761                                         streamPos = rawPack.pos + rawPack.len;
2762                                 }
2763                         }
2764                         if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2765                                 throw TimeoutException();
2766
2767                         sys->sleepIdle();
2768
2769                 }
2770         }catch(StreamException &e)
2771         {
2772                 LOG_ERROR("Stream channel: %s",e.msg);
2773         }
2774 }
2775 // -----------------------------------
2776 void Servent::sendPeercastChannel()
2777 {
2778         try
2779         {
2780                 setStatus(S_CONNECTED);
2781
2782                 Channel *ch = chanMgr->findChannelByID(chanID);
2783                 if (!ch)
2784                         throw StreamException("Channel not found");
2785
2786                 LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr());
2787
2788                 sock->writeTag("PCST");
2789
2790                 ChanPacket pack;
2791
2792                 ch->headPack.writePeercast(*sock);
2793
2794                 pack.init(ChanPacket::T_META,ch->insertMeta.data,ch->insertMeta.len,ch->streamPos);
2795                 pack.writePeercast(*sock);
2796         
2797                 streamPos = 0;
2798                 unsigned int syncPos=0;
2799                 while ((thread.active) && sock->active())
2800                 {
2801                         ch = chanMgr->findChannelByID(chanID);
2802                         if (ch)
2803                         {
2804
2805                                 ChanPacket rawPack;
2806                                 if (ch->rawData.findPacket(streamPos,rawPack))
2807                                 {
2808                                         if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2809                                         {
2810                                                 sock->writeTag("SYNC");
2811                                                 sock->writeShort(4);
2812                                                 sock->writeShort(0);
2813                                                 sock->write(&syncPos,4);
2814                                                 syncPos++;
2815
2816                                                 rawPack.writePeercast(*sock);
2817                                         }
2818                                         streamPos = rawPack.pos + rawPack.len;
2819                                 }
2820                         }
2821                         sys->sleepIdle();
2822                 }
2823
2824         }catch(StreamException &e)
2825         {
2826                 LOG_ERROR("Stream channel: %s",e.msg);
2827         }
2828 }
2829
2830 //WLock canStreamLock;
2831
2832 // -----------------------------------
2833 void Servent::sendPCPChannel()
2834 {
2835         bool skipCheck = false;
2836         unsigned int ptime = 0;
2837         int npacket = 0, upsize = 0;
2838
2839         Channel *ch = chanMgr->findChannelByID(chanID);
2840         if (!ch)
2841                 throw StreamException("Channel not found");
2842
2843         AtomStream atom(*sock);
2844
2845         pcpStream = new PCPStream(remoteID);
2846         int error=0;
2847
2848         try
2849         {
2850
2851                 LOG_DEBUG("Starting PCP stream of channel at %d",streamPos);
2852
2853
2854 //              setStatus(S_CONNECTED);
2855
2856                 //canStreamLock.on();
2857                 //thread.active = canStream(ch);
2858                 //setStatus(S_CONNECTED);
2859                 //canStreamLock.off();
2860
2861                 lastSkipTime = 0;
2862                 lastSkipCount = 0;
2863                 waitPort = 0;
2864
2865                 if (thread.active){
2866                         atom.writeParent(PCP_CHAN,3 + ((sendHeader)?1:0));
2867                                 atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
2868                                 ch->info.writeInfoAtoms(atom);
2869                                 ch->info.writeTrackAtoms(atom);
2870                                 if (sendHeader)
2871                                 {
2872                                         atom.writeParent(PCP_CHAN_PKT,3);
2873                                         atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2874                                         atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
2875                                         atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
2876
2877                                         if (streamPos == 0)
2878                                                 streamPos = ch->headPack.pos+ch->headPack.len;
2879                                         LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2880                                 }
2881                 }
2882
2883                 unsigned int streamIndex = ch->streamIndex;
2884
2885                 ChanPacket rawPack;
2886                 char pbuf[ChanPacket::MAX_DATALEN*3];
2887                 MemoryStream mems(pbuf,sizeof(pbuf));
2888                 AtomStream atom2(mems);
2889
2890                 while (thread.active)
2891                 {
2892
2893                         Channel *ch = chanMgr->findChannelByID(chanID);
2894
2895                         if (ch)
2896                         {
2897
2898                                 if (streamIndex != ch->streamIndex)
2899                                 {
2900                                         streamIndex = ch->streamIndex;
2901                                         streamPos = ch->headPack.pos;
2902                                         LOG_DEBUG("sendPCPStream got new stream index");                                                
2903                                 }
2904
2905                                 mems.rewind();
2906
2907                                 if (ch->rawData.findPacket(streamPos,rawPack))
2908                                 {
2909                                         if ((streamPos < rawPack.pos) && !rawPack.skip){
2910                                                 if (skipCheck){
2911                                                         char tmp[32];
2912                                                         getHost().IPtoStr(tmp);
2913                                                         LOG_NETWORK("##### send skipping ##### %d (%d, %d) -> %s", (rawPack.pos - streamPos), streamPos, rawPack.pos, tmp);
2914
2915                                                         if (sys->getTime() == lastSkipTime) {
2916                                                                 LOG_DEBUG("##### skip all buffer");
2917                                                                 streamPos = ch->rawData.getLatestPos();
2918                                                                 continue;
2919                                                         }
2920
2921                                                         lastSkipTime = sys->getTime();
2922                                                         lastSkipCount++;
2923                                                 } else {
2924                                                         skipCheck = true;
2925                                                 }
2926                                         }
2927
2928                                         if (rawPack.type == ChanPacket::T_HEAD)
2929                                         {
2930                                                 atom2.writeParent(PCP_CHAN,2);
2931                                                         atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2932                                                         atom2.writeParent(PCP_CHAN_PKT,3);
2933                                                                 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2934                                                                 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2935                                                                 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2936
2937                                                 sock->write(pbuf, mems.getPosition());
2938                                         }else if (rawPack.type == ChanPacket::T_DATA)
2939                                         {
2940                                                 atom2.writeParent(PCP_CHAN,2);
2941                                                         atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2942                                                         atom2.writeParent(PCP_CHAN_PKT,3);
2943                                                                 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_DATA);
2944                                                                 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2945                                                                 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2946
2947 #ifdef WIN32
2948                                                 sock->bufferingWrite(pbuf, mems.getPosition());
2949                                                 lastSkipTime = sock->bufList.lastSkipTime;
2950                                                 lastSkipCount = sock->bufList.skipCount;
2951 #else
2952                                                 sock->write(pbuf, mems.getPosition());
2953 #endif
2954                                         }
2955
2956                                         if (rawPack.pos < streamPos)
2957                                                 LOG_DEBUG("pcp: skip back %d",rawPack.pos-streamPos);
2958
2959                                         //LOG_DEBUG("Sending %d-%d (%d,%d,%d)",rawPack.pos,rawPack.pos+rawPack.len,ch->streamPos,ch->rawData.getLatestPos(),ch->rawData.getOldestPos());
2960
2961                                         streamPos = rawPack.pos+rawPack.len;
2962                                 }
2963                         } else {
2964                                 throw StreamException("Channel not found");
2965                         }
2966
2967 #ifdef WIN32
2968                         sock->bufferingWrite(NULL, 0);
2969                         lastSkipTime = sock->bufList.lastSkipTime;
2970                         lastSkipCount = sock->bufList.skipCount;
2971 #endif
2972                         BroadcastState bcs;
2973                         bcs.servent_id = servent_id;
2974 //                      error = pcpStream->readPacket(*sock,bcs);
2975
2976                         unsigned int t = sys->getTime();
2977                         if (t != ptime) {
2978                                 ptime = t;
2979                                 npacket = MAX_PROC_PACKETS;
2980                                 upsize = MAX_OUTWARD_SIZE;
2981                         }
2982
2983                         int len = pcpStream->flushUb(*sock, upsize);
2984                         upsize -= len;
2985
2986                         while (npacket > 0 && sock->readReady()) {
2987                                 npacket--;
2988                                 error = pcpStream->readPacket(*sock,bcs);
2989                                 if (error)
2990                                         throw StreamException("PCP exception");
2991                         }
2992
2993                         sys->sleepIdle();
2994
2995                 }
2996
2997                 LOG_DEBUG("PCP channel stream closed normally.");
2998
2999         }catch(StreamException &e)
3000         {
3001                 LOG_ERROR("Stream channel: %s",e.msg);
3002         }
3003
3004         try
3005         {
3006                 pcpStream->flush(*sock);
3007                 atom.writeInt(PCP_QUIT,error);
3008         }catch(StreamException &) {}
3009
3010 }
3011
3012 // -----------------------------------
3013 int Servent::serverProcMain(ThreadInfo *thread)
3014 {
3015 //      thread->lock();
3016
3017
3018         Servent *sv = (Servent*)thread->data;
3019
3020         try 
3021         {
3022                 if (!sv->sock)
3023                         throw StreamException("Server has no socket");
3024
3025                 sv->setStatus(S_LISTENING);
3026
3027
3028                 char servIP[64];
3029                 sv->sock->host.toStr(servIP);
3030
3031                 if (servMgr->isRoot)
3032                         LOG_DEBUG("Root Server started: %s",servIP);
3033                 else
3034                         LOG_DEBUG("Server started: %s",servIP);
3035                 
3036
3037                 while ((thread->active) && (sv->sock->active()))
3038                 {
3039                         if (servMgr->numActiveOnPort(sv->sock->host.port) < servMgr->maxServIn)
3040                         {
3041                                 ClientSocket *cs = sv->sock->accept();
3042
3043                                 // \95s\90³\82È\83\\81[\83X\83A\83h\83\8c\83X(IPv4\83}\83\8b\83`\83L\83\83\83X\83g)\82ð\8f\9c\8aO
3044                                 if (cs && (((cs->host.ip >> 24) & 0xF0) == 0xE0))
3045                                 {
3046                                         char ip[64];
3047                                         cs->host.toStr(ip);
3048                                         cs->close();
3049                                         LOG_ERROR("reject incoming multicast address: %s", ip);
3050                                         peercastApp->notifyMessage(ServMgr::NT_PEERCAST, "reject multicast address");
3051                                 } else
3052                                 if (cs)
3053                                 {
3054                                         // countermeasure against DoS Atk
3055                                         if (cs->host.ip != (0x7F000001)) // bypass loopback
3056                                         {
3057                                                 // check blacklist
3058                                                 addrCont clientAddr(cs->host.ip);
3059                                                 servMgr->IP_blacklist->lock();
3060                                                 if (servMgr->IP_blacklist->find(clientAddr))
3061                                                 {
3062                                                         // blacklisted
3063                                                         servMgr->IP_blacklist->unlock();
3064
3065                                                         LOG_DEBUG("REFUSED: %d.%d.%d.%d", (cs->host.ip >> 24), (cs->host.ip >> 16) & 0xFF, (cs->host.ip >> 8) & 0xFF, cs->host.ip & 0xFF);
3066                                                         cs->close();
3067                                                         sys->sleep(100);
3068
3069                                                         continue;
3070                                                 }
3071
3072                                                 servMgr->IP_blacklist->unlock();
3073                                                 LOG_DEBUG("ACCEPT: %d.%d.%d.%d", (cs->host.ip >> 24), (cs->host.ip >> 16) & 0xFF, (cs->host.ip >> 8) & 0xFF, cs->host.ip & 0xFF);
3074
3075
3076                                                 // check graylist
3077                                                 servMgr->IP_graylist->lock();
3078                                                 size_t idx;
3079                                                 if (servMgr->IP_graylist->find(clientAddr, &idx))
3080                                                 {
3081                                                         // update
3082                                                         ++(servMgr->IP_graylist->at(idx));
3083                                                         LOG_DEBUG("UPDATE: %d.%d.%d.%d", (cs->host.ip >> 24), (cs->host.ip >> 16) & 0xFF, (cs->host.ip >> 8) & 0xFF, cs->host.ip & 0xFF);
3084                                                 } else
3085                                                 {
3086                                                         // graylisted
3087                                                         servMgr->IP_graylist->push_back(clientAddr);
3088                                                         LOG_DEBUG("GRAYED: %d.%d.%d.%d", (cs->host.ip >> 24), (cs->host.ip >> 16) & 0xFF, (cs->host.ip >> 8) & 0xFF, cs->host.ip & 0xFF);
3089                                                 }
3090                                                 servMgr->IP_graylist->unlock();
3091                                         }
3092
3093                                         LOG_DEBUG("accepted incoming");
3094                                         Servent *ns = servMgr->allocServent();
3095                                         if (ns)
3096                                         {
3097                                                 servMgr->lastIncoming = sys->getTime();
3098                                                 ns->servPort = sv->sock->host.port;
3099                                                 ns->networkID = servMgr->networkID;
3100                                                 ns->initIncoming(cs,sv->allow);
3101                                         }else
3102                                                 LOG_ERROR("Out of servents");
3103                                 }
3104                         }
3105                         sys->sleep(10);
3106                 }
3107         }catch(StreamException &e)
3108         {
3109                 LOG_ERROR("Server Error: %s:%d",e.msg,e.err);
3110         }
3111
3112         
3113         LOG_DEBUG("Server stopped");
3114
3115         sv->kill();
3116         sys->endThread(thread);
3117         return 0;
3118 }
3119
3120 // -----------------------------------
3121 int Servent::serverProc(ThreadInfo *thread)
3122 {
3123         SEH_THREAD(serverProcMain, Servent::serverProc);
3124 }
3125  
3126 // -----------------------------------
3127 bool    Servent::writeVariable(Stream &s, const String &var)
3128 {
3129         char buf[1024];
3130
3131         if (var == "type")
3132                 strcpy(buf,getTypeStr());
3133         else if (var == "status")
3134                 strcpy(buf,getStatusStr());
3135         else if (var == "address")
3136         {
3137                 if (servMgr->enableGetName) //JP-EX s
3138                 {
3139                         getHost().toStr(buf);
3140                         char h_ip[64];
3141                         Host h = getHost();
3142                         h.toStr(h_ip);
3143
3144 /*                      ChanHitList *hits[ChanMgr::MAX_HITLISTS];
3145                         int numHits=0;
3146                         for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
3147                         {
3148                                 ChanHitList *chl = &chanMgr->hitlists[i];
3149                                 if (chl->isUsed())
3150                                         hits[numHits++] = chl;
3151                         }
3152                         bool ishit,isfw;
3153                         ishit = isfw = false;
3154                         int numRelay = 0;
3155                         if (numHits) 
3156                         {
3157                                 for(int k=0; k<numHits; k++)
3158                                 {
3159                                         ChanHitList *chl = hits[k];
3160                                         if (chl->isUsed())
3161                                         {
3162                                                 for (int j=0; j<ChanHitList::MAX_HITS; j++)
3163                                                 {
3164                                                         ChanHit *hit = &chl->hits[j];
3165                                                         if (hit->host.isValid() && (h.ip == hit->host.ip))
3166                                                         {
3167                                                                 ishit = true;
3168                                                                 if (hit->firewalled)
3169                                                                         isfw = true;
3170                                                                 numRelay += hit->numRelays;
3171                                                         }
3172                                                 }
3173                                         }
3174                                 }
3175                         }
3176                         strcpy(buf,"");
3177                         if (ishit == true)
3178                         {
3179                                 if (isfw == true)
3180                                 {
3181                                         if (numRelay== 0)
3182                                                 strcat(buf,"<font color=red>");
3183                                         else 
3184                                                 strcat(buf,"<font color=orange>");
3185                                 }
3186                                 else
3187                                         strcat(buf,"<font color=green>");
3188                         }
3189                         strcat(buf,h_ip);
3190                         char h_name[128];
3191                         if (ClientSocket::getHostname(h_name,h.ip))
3192                         {
3193                                 strcat(buf,"[");
3194                                 strcat(buf,h_name);
3195                                 strcat(buf,"]");
3196                         }
3197                         if (ishit == true) 
3198                         {
3199                                 strcat(buf,"</font>");
3200                         }
3201                 } //JP-EX e*/
3202
3203
3204                         bool isfw = false;
3205                         bool isRelay = true;
3206                         int numRelay = 0;
3207                         ChanHitList *chl = chanMgr->findHitListByID(chanID);
3208                         if (chl){
3209                                 ChanHit *hit = chl->hit;
3210                                 while(hit){
3211                                         if (hit->host.isValid() && (h.ip == hit->host.ip)){
3212                                                 isfw = hit->firewalled;
3213                                                 isRelay = hit->relay;
3214                                                 numRelay = hit->numRelays;
3215                                                 break;
3216                                         }
3217                                         hit = hit->next;
3218                                 }
3219                         }
3220                         strcpy(buf, "");
3221                         if (isfw){
3222                                 if (numRelay == 0){
3223                                         strcat(buf,"<font color=red>");
3224                                 } else {
3225                                         strcat(buf,"<font color=orange>");
3226                                 }
3227                         } else {
3228                                 if (!isRelay){
3229                                         if (numRelay==0){
3230                                                 strcpy(buf,"<font color=purple>");
3231                                         } else {
3232                                                 strcpy(buf,"<font color=blue>");
3233                                         }
3234                                 } else {
3235                                         strcpy(buf,"<font color=green>");
3236                                 }
3237                         }
3238                         strcat(buf,h_ip);
3239                         char h_name[128];
3240                         if (ClientSocket::getHostname(h_name,sizeof(h_name),h.ip)) //JP-MOD(BOF\91Î\8dô)
3241                         {
3242                                 strcat(buf,"[");
3243                                 strcat(buf,h_name);
3244                                 strcat(buf,"]");
3245                         }
3246                         strcat(buf,"</font>");
3247                 }
3248                 else 
3249                         getHost().toStr(buf);
3250         }
3251         else if (var == "agent")
3252                 strcpy(buf,agent.cstr());
3253         else if (var == "bitrate")
3254         {
3255                 if (sock)
3256                 {
3257                         unsigned int tot = sock->bytesInPerSec+sock->bytesOutPerSec;
3258                         sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
3259                 }else
3260                         strcpy(buf,"0");
3261         }else if (var == "uptime")
3262         {
3263                 String uptime;
3264                 if (lastConnect)
3265                         uptime.setFromStopwatch(sys->getTime()-lastConnect);
3266                 else
3267                         uptime.set("-");
3268                 strcpy(buf,uptime.cstr());
3269         }else if (var.startsWith("gnet."))
3270         {
3271
3272                 float ctime = (float)(sys->getTime()-lastConnect);
3273                 if (var == "gnet.packetsIn")
3274                         sprintf(buf,"%d",gnuStream.packetsIn);
3275                 else if (var == "gnet.packetsInPerSec")
3276                         sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsIn)/ctime:0);
3277                 else if (var == "gnet.packetsOut")
3278                         sprintf(buf,"%d",gnuStream.packetsOut);
3279                 else if (var == "gnet.packetsOutPerSec")
3280                         sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsOut)/ctime:0);
3281                 else if (var == "gnet.normQueue")
3282                         sprintf(buf,"%d",outPacketsNorm.numPending());
3283                 else if (var == "gnet.priQueue")
3284                         sprintf(buf,"%d",outPacketsPri.numPending());
3285                 else if (var == "gnet.flowControl")
3286                         sprintf(buf,"%d",flowControl?1:0);
3287                 else if (var == "gnet.routeTime")
3288                 {
3289                         int nr = seenIDs.numUsed();
3290                         unsigned int tim = sys->getTime()-seenIDs.getOldest();
3291                 
3292                         String tstr;
3293                         tstr.setFromStopwatch(tim);
3294
3295                         if (nr)
3296                                 strcpy(buf,tstr.cstr());
3297                         else
3298                                 strcpy(buf,"-");
3299                 }
3300                 else
3301                         return false;
3302
3303         }else
3304                 return false;
3305
3306         s.writeString(buf);
3307
3308         return true;
3309 }