pack.writeRaw(in);
}
}
+
+// ------------------------------------------
+unsigned int PCPStream::flushUb(Stream &in, unsigned int size)
+{
+ ChanPacket pack;
+ unsigned int len = 0, skip = 0;
+
+ while (outData.numPending())
+ {
+ outData.readPacketPri(pack);
+
+ if (size >= len + pack.len) {
+ len += pack.len;
+ pack.writeRaw(in);
+ } else {
+ skip++;
+ }
+ }
+ if (skip > 0)
+ LOG_DEBUG("PCPStream::flushUb: skip %d packets", skip);
+
+ return len;
+}
+
// ------------------------------------------
int PCPStream::readPacket(Stream &in,Channel *)
{
ipNum = 1;
}
else if (id == PCP_HOST_NUML)
+ {
hit.numListeners = atom.readInt();
+ if (hit.numListeners > 10)
+ hit.numListeners = 10;
+ }
else if (id == PCP_HOST_NUMR)
+ {
hit.numRelays = atom.readInt();
+ if (hit.numRelays > 100)
+ hit.numRelays = 100;
+ }
else if (id == PCP_HOST_UPTIME)
hit.upTime = atom.readInt();
else if (id == PCP_HOST_OLDPOS)
if (hit.numHops == 1){
Servent *sv = servMgr->findServentByServentID(hit.servent_id);
- if (sv){
+ if (sv && sv->getHost().ip == hit.host.ip){
// LOG_DEBUG("set servent's waitPort = %d", hit.host.port);
sv->waitPort = hit.host.port;
+ hit.lastSendSeq = sv->serventHit.lastSendSeq;
+ sv->serventHit = hit;
}
}
}
{
ChanHit hit;
readHostAtoms(atom,c,bcs,hit,false);
+ Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
if (hit.uphost.ip == 0){
// LOG_DEBUG("bcs servent_id = %d", bcs.servent_id);
if (bcs.numHops == 1){
hit.uphost.port = servMgr->serverHost.port;
hit.uphostHops = 1;
} else {
- Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
+ //Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
if (sv){
hit.uphost.ip = sv->getHost().ip;
hit.uphost.port = sv->waitPort;
}
}
}
- int oldPos = pmem.pos;
- hit.writeAtoms(patom, hit.chanID);
- pmem.pos = oldPos;
- r = readAtom(patom,bcs);
+ if (sv &&
+ ((hit.numHops == 1 && (hit.rhost[0].ip == sv->getHost().ip
+ && hit.uphost.ip == servMgr->serverHost.ip && hit.uphost.port == servMgr->serverHost.port)
+ || (hit.rhost[1].localIP() && hit.rhost[1].ip == sv->getHost().ip))
+ || chanMgr->findParentHit(hit)))
+ {
+ int oldPos = pmem.pos;
+ hit.writeAtoms(patom, hit.chanID);
+ pmem.pos = oldPos;
+ r = readAtom(patom,bcs);
+ } else {
+ LOG_DEBUG("### Invalid bcst: hops=%d, ver=%d(VP%04d), ttl=%d",
+ bcs.numHops,ver,ver_vp,ttl);
+ ttl = 0;
+ }
} else {
// copy and process atoms
int oldPos = pmem.pos;
if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
{
+ pack.priority = 11 - bcs.numHops;
chanMgr->broadcastPacketUp(pack,bcs.chanID,remoteID,destID);
}