OSDN Git Service

生命キャスト対策コード(VPdiff20071223)マージ。
[peercast-im/PeerCastIM.git] / c: / Git / PeerCast.root / PeerCast / core / common / pcp.cpp
index fdf5d8b..08e8485 100644 (file)
@@ -84,6 +84,30 @@ void PCPStream::flush(Stream &in)
                pack.writeRaw(in);
        }
 }
+
+// ------------------------------------------
+unsigned int PCPStream::flushUb(Stream &in, unsigned int size)
+{
+       ChanPacket pack;
+       unsigned int len = 0, skip = 0;
+
+       while (outData.numPending())
+       {
+               outData.readPacketPri(pack);
+
+               if (size >= len + pack.len) {
+                       len += pack.len;
+                       pack.writeRaw(in);
+               } else {
+                       skip++;
+               }
+       }
+       if (skip > 0)
+               LOG_DEBUG("PCPStream::flushUb: skip %d packets", skip);
+
+       return len;
+}
+
 // ------------------------------------------
 int PCPStream::readPacket(Stream &in,Channel *)
 {
@@ -433,9 +457,17 @@ void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs, C
                                ipNum = 1;
                }
                else if (id == PCP_HOST_NUML)
+               {
                        hit.numListeners = atom.readInt();
+                       if (hit.numListeners > 10)
+                               hit.numListeners = 10;
+               }
                else if (id == PCP_HOST_NUMR)
+               {
                        hit.numRelays = atom.readInt();
+                       if (hit.numRelays > 100)
+                               hit.numRelays = 100;
+               }
                else if (id == PCP_HOST_UPTIME)
                        hit.upTime = atom.readInt();
                else if (id == PCP_HOST_OLDPOS)
@@ -500,9 +532,11 @@ void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs, C
 
        if (hit.numHops == 1){
                Servent *sv = servMgr->findServentByServentID(hit.servent_id);
-               if (sv){
+               if (sv && sv->getHost().ip == hit.host.ip){
 //                     LOG_DEBUG("set servent's waitPort = %d", hit.host.port);
                        sv->waitPort = hit.host.port;
+                       hit.lastSendSeq = sv->serventHit.lastSendSeq;
+                       sv->serventHit = hit;
                }
        }
 }
@@ -696,6 +730,7 @@ int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
                {
                        ChanHit hit;
                        readHostAtoms(atom,c,bcs,hit,false);
+                       Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
                        if (hit.uphost.ip == 0){
 //                             LOG_DEBUG("bcs servent_id = %d", bcs.servent_id);
                                if (bcs.numHops == 1){
@@ -703,7 +738,7 @@ int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
                                        hit.uphost.port = servMgr->serverHost.port;
                                        hit.uphostHops = 1;
                                } else {
-                                       Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
+                                       //Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
                                        if (sv){
                                                hit.uphost.ip = sv->getHost().ip;
                                                hit.uphost.port = sv->waitPort;
@@ -711,10 +746,21 @@ int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
                                        }
                                }
                        }
-                       int oldPos = pmem.pos;
-                       hit.writeAtoms(patom, hit.chanID);
-                       pmem.pos = oldPos;
-                       r = readAtom(patom,bcs);
+                       if (sv &&
+                               ((hit.numHops == 1 && (hit.rhost[0].ip == sv->getHost().ip
+                               && hit.uphost.ip == servMgr->serverHost.ip && hit.uphost.port == servMgr->serverHost.port)
+                               || (hit.rhost[1].localIP() && hit.rhost[1].ip == sv->getHost().ip))
+                               || chanMgr->findParentHit(hit)))
+                       {
+                               int oldPos = pmem.pos;
+                               hit.writeAtoms(patom, hit.chanID);
+                               pmem.pos = oldPos;
+                               r = readAtom(patom,bcs);
+                       } else {
+                               LOG_DEBUG("### Invalid bcst: hops=%d, ver=%d(VP%04d), ttl=%d",
+                                       bcs.numHops,ver,ver_vp,ttl);
+                               ttl = 0;
+                       }
                } else {
                        // copy and process atoms
                        int oldPos = pmem.pos;
@@ -761,6 +807,7 @@ int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
 
                if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
                {
+                       pack.priority = 11 - bcs.numHops;
                        chanMgr->broadcastPacketUp(pack,bcs.chanID,remoteID,destID);
                }