OSDN Git Service

生命キャスト対策コード(VPdiff20071223)マージ。
[peercast-im/PeerCastIM.git] / PeerCast.root / PeerCast / core / common / channel.cpp
index 4ca13be..665f690 100644 (file)
@@ -537,6 +537,7 @@ int Channel::handshakeFetch()
        sock->writeLineF("GET /channel/%s HTTP/1.0",idStr);
        sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
        sock->writeLineF("%s %d",PCX_HS_PCP,1);
+       sock->writeLineF("%s %d",PCX_HS_PORT,servMgr->serverHost.port);
 
        sock->writeLine("");
 
@@ -1475,8 +1476,11 @@ bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack)
 // -----------------------------------
 bool   Channel::checkBump()
 {
+       unsigned int maxIdleTime = 30;
+       if (isIndexTxt(this)) maxIdleTime = 60;
+
        if (!isBroadcasting() && (!sourceHost.tracker))
-               if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > 30))
+           if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > maxIdleTime))
                {
                        LOG_ERROR("Channel Auto bumped");
                        bump = true;
@@ -1509,6 +1513,9 @@ int Channel::readStream(Stream &in,ChannelStream *source)
 
        unsigned int receiveStartTime = 0;
 
+       unsigned int ptime = 0;
+       unsigned int upsize = 0;
+
        try
        {
                while (thread.active && !peercastInst->isQuitting)
@@ -1573,7 +1580,14 @@ int Channel::readStream(Stream &in,ChannelStream *source)
                                }
                        }
 
-                       source->flush(in);
+                       unsigned int t = sys->getTime();
+                       if (t != ptime) {
+                               ptime = t;
+                               upsize = Servent::MAX_OUTWARD_SIZE;
+                       }
+
+                       unsigned int len = source->flushUb(in, upsize);
+                       upsize -= len;
 
                        sys->sleepIdle();
                }
@@ -1733,9 +1747,12 @@ void ChanPacket::init(ChanPacketv &p)
 {
        type = p.type;
        len = p.len;
+       if (len > MAX_DATALEN)
+               throw StreamException("Packet data too large");
        pos = p.pos;
        sync = p.sync;
        skip = p.skip;
+       priority = p.priority;
        memcpy(data, p.data, len);
 }
 // -----------------------------------
@@ -1748,6 +1765,7 @@ void ChanPacket::init(TYPE t, const void *p, unsigned int l,unsigned int _pos)
        memcpy(data,p,len);
        pos = _pos;
        skip = false;
+       priority = 0;
 }
 // -----------------------------------
 void ChanPacket::writeRaw(Stream &out)
@@ -1970,10 +1988,34 @@ void    ChanPacketBuffer::readPacket(ChanPacket &pack)
        pack.init(packets[readPos%MAX_PACKETS]);
        readPos++;
        lock.off();
+}
 
-       sys->sleepIdle();
+// -----------------------------------
+void   ChanPacketBuffer::readPacketPri(ChanPacket &pack)
+{
+       unsigned int tim = sys->getTime();
+
+       if (readPos < firstPos) 
+               throw StreamException("Read too far behind");
+       while (readPos >= writePos)
+       {
+               sys->sleepIdle();
+               if ((sys->getTime() - tim) > 30)
+                       throw TimeoutException();
+       }
+       lock.on();
+       ChanPacketv *best = &packets[readPos % MAX_PACKETS];
+       for (unsigned int i = readPos + 1; i < writePos; i++) {
+               if (packets[i % MAX_PACKETS].priority > best->priority)
+                       best = &packets[i % MAX_PACKETS];
+       }
+       pack.init(*best);
+       best->init(packets[readPos % MAX_PACKETS]);
+       readPos++;
+       lock.off();
+ }
 
-}
 // -----------------------------------
 bool   ChanPacketBuffer::willSkip()
 {
@@ -2898,6 +2940,35 @@ ChanHit *ChanMgr::addHit(ChanHit &h)
 }
 
 // -----------------------------------
+bool ChanMgr::findParentHit(ChanHit &p)
+{
+       ChanHitList *hl=NULL;
+
+       chanMgr->hitlistlock.on();
+
+       hl = findHitListByID(p.chanID);
+
+       if (hl)
+       {
+               ChanHit *ch = hl->hit;
+               while (ch)
+               {
+                       if (!ch->dead && (ch->rhost[0].ip == p.uphost.ip)
+                               && (ch->rhost[0].port == p.uphost.port))
+                       {
+                               chanMgr->hitlistlock.off();
+                               return 1;
+                       }
+                       ch = ch->next;
+               }
+       }
+
+       chanMgr->hitlistlock.off();
+
+       return 0;
+}
+
+// -----------------------------------
 class ChanFindInfo : public ThreadInfo
 {
 public:
@@ -3064,6 +3135,7 @@ void ChanHit::init()
        version_ex_number = 0;
 
        status = 0;
+       servent_id = 0;
 
        sessionID.clear();
        chanID.clear();
@@ -4503,6 +4575,22 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList
        riSequence &= 0xffffff;
        seqLock.off();
 
+       Servent *s = servMgr->servents;
+       while (s) {
+               if (s->serventHit.rhost[0].port && s->type == Servent::T_RELAY
+                       && s->chanID.isSame(chl->info.id)) {
+                               int i = index % MAX_RESULTS;
+                               if (index < MAX_RESULTS
+                                       || tmpHit[i].lastSendSeq > s->serventHit.lastSendSeq) {
+                                               s->serventHit.lastSendSeq = seq;
+                                               tmpHit[i] = s->serventHit;
+                                               tmpHit[i].host = s->serventHit.rhost[0];
+                                               index++;
+                               }
+               }
+               s = s->next;
+       }
+
        ChanHit *hit = chl->hit;
 
        while(hit){
@@ -4534,6 +4622,7 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList
                                //rnd = (float)rand() / (float)RAND_MAX;
                                rnd = rand() % base;
                                if (hit->numHops == 1){
+#if 0
                                        if (tmpHit[index % MAX_RESULTS].numHops == 1){
                                                if (rnd < prob){
                                                        tmpHit[index % MAX_RESULTS] = *hit;
@@ -4545,8 +4634,9 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList
                                                tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
                                                index++;
                                        }
+#endif
                                } else {
-                                       if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob)){
+                                       if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob) || rnd == 0){
                                                tmpHit[index % MAX_RESULTS] = *hit;
                                                tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
                                                index++;