sock->writeLineF("GET /channel/%s HTTP/1.0",idStr);
sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
sock->writeLineF("%s %d",PCX_HS_PCP,1);
+ sock->writeLineF("%s %d",PCX_HS_PORT,servMgr->serverHost.port);
sock->writeLine("");
// -----------------------------------
bool Channel::checkBump()
{
+ unsigned int maxIdleTime = 30;
+ if (isIndexTxt(this)) maxIdleTime = 60;
+
if (!isBroadcasting() && (!sourceHost.tracker))
- if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > 30))
+ if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > maxIdleTime))
{
LOG_ERROR("Channel Auto bumped");
bump = true;
unsigned int receiveStartTime = 0;
+ unsigned int ptime = 0;
+ unsigned int upsize = 0;
+
try
{
while (thread.active && !peercastInst->isQuitting)
}
}
- source->flush(in);
+ unsigned int t = sys->getTime();
+ if (t != ptime) {
+ ptime = t;
+ upsize = Servent::MAX_OUTWARD_SIZE;
+ }
+
+ unsigned int len = source->flushUb(in, upsize);
+ upsize -= len;
sys->sleepIdle();
}
{
type = p.type;
len = p.len;
+ if (len > MAX_DATALEN)
+ throw StreamException("Packet data too large");
pos = p.pos;
sync = p.sync;
skip = p.skip;
+ priority = p.priority;
memcpy(data, p.data, len);
}
// -----------------------------------
memcpy(data,p,len);
pos = _pos;
skip = false;
+ priority = 0;
}
// -----------------------------------
void ChanPacket::writeRaw(Stream &out)
pack.init(packets[readPos%MAX_PACKETS]);
readPos++;
lock.off();
+}
- sys->sleepIdle();
+// -----------------------------------
+void ChanPacketBuffer::readPacketPri(ChanPacket &pack)
+{
+ unsigned int tim = sys->getTime();
+
+ if (readPos < firstPos)
+ throw StreamException("Read too far behind");
+
+ while (readPos >= writePos)
+ {
+ sys->sleepIdle();
+ if ((sys->getTime() - tim) > 30)
+ throw TimeoutException();
+ }
+ lock.on();
+ ChanPacketv *best = &packets[readPos % MAX_PACKETS];
+ for (unsigned int i = readPos + 1; i < writePos; i++) {
+ if (packets[i % MAX_PACKETS].priority > best->priority)
+ best = &packets[i % MAX_PACKETS];
+ }
+ pack.init(*best);
+ best->init(packets[readPos % MAX_PACKETS]);
+ readPos++;
+ lock.off();
+ }
-}
// -----------------------------------
bool ChanPacketBuffer::willSkip()
{
}
// -----------------------------------
+bool ChanMgr::findParentHit(ChanHit &p)
+{
+ ChanHitList *hl=NULL;
+
+ chanMgr->hitlistlock.on();
+
+ hl = findHitListByID(p.chanID);
+
+ if (hl)
+ {
+ ChanHit *ch = hl->hit;
+ while (ch)
+ {
+ if (!ch->dead && (ch->rhost[0].ip == p.uphost.ip)
+ && (ch->rhost[0].port == p.uphost.port))
+ {
+ chanMgr->hitlistlock.off();
+ return 1;
+ }
+ ch = ch->next;
+ }
+ }
+
+ chanMgr->hitlistlock.off();
+
+ return 0;
+}
+
+// -----------------------------------
class ChanFindInfo : public ThreadInfo
{
public:
version_ex_number = 0;
status = 0;
+ servent_id = 0;
sessionID.clear();
chanID.clear();
riSequence &= 0xffffff;
seqLock.off();
+ Servent *s = servMgr->servents;
+ while (s) {
+ if (s->serventHit.rhost[0].port && s->type == Servent::T_RELAY
+ && s->chanID.isSame(chl->info.id)) {
+ int i = index % MAX_RESULTS;
+ if (index < MAX_RESULTS
+ || tmpHit[i].lastSendSeq > s->serventHit.lastSendSeq) {
+ s->serventHit.lastSendSeq = seq;
+ tmpHit[i] = s->serventHit;
+ tmpHit[i].host = s->serventHit.rhost[0];
+ index++;
+ }
+ }
+ s = s->next;
+ }
+
ChanHit *hit = chl->hit;
while(hit){
//rnd = (float)rand() / (float)RAND_MAX;
rnd = rand() % base;
if (hit->numHops == 1){
+#if 0
if (tmpHit[index % MAX_RESULTS].numHops == 1){
if (rnd < prob){
tmpHit[index % MAX_RESULTS] = *hit;
tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
index++;
}
+#endif
} else {
- if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob)){
+ if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob) || rnd == 0){
tmpHit[index % MAX_RESULTS] = *hit;
tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
index++;