Blame | Last modification | View Log | Download
# Written by Bram Cohen# see LICENSE.txt for license informationfrom traceback import print_excfrom binascii import b2a_hexfrom clock import clockfrom CurrentRateMeasure import Measurefrom cStringIO import StringIOfrom math import sqrttry:Trueexcept:True = 1False = 0try:sum([1])except:sum = lambda a: reduce(lambda x,y: x+y, a, 0)DEBUG = FalseMAX_RATE_PERIOD = 20.0MAX_RATE = 10e10PING_BOUNDARY = 1.2PING_SAMPLES = 7PING_DISCARDS = 1PING_THRESHHOLD = 5PING_DELAY = 5 # cycles 'til first upward adjustmentPING_DELAY_NEXT = 2 # 'til nextADJUST_UP = 1.05ADJUST_DOWN = 0.95UP_DELAY_FIRST = 5UP_DELAY_NEXT = 2SLOTS_STARTING = 6SLOTS_FACTOR = 1.66/1000class RateLimiter:def __init__(self, sched, unitsize, slotsfunc = lambda x: None):self.sched = schedself.last = Noneself.unitsize = unitsizeself.slotsfunc = slotsfuncself.measure = Measure(MAX_RATE_PERIOD)self.autoadjust = Falseself.upload_rate = MAX_RATE * 1000self.slots = SLOTS_STARTING # garbage if not automaticdef set_upload_rate(self, rate):# rate = -1 # test automaticif rate < 0:if self.autoadjust:returnself.autoadjust = Trueself.autoadjustup = 0self.pings = []rate = MAX_RATEself.slots = SLOTS_STARTINGself.slotsfunc(self.slots)else:self.autoadjust = Falseif not rate:rate = MAX_RATEself.upload_rate = rate * 1000self.lasttime = clock()self.bytes_sent = 0def queue(self, conn):assert conn.next_upload is Noneif self.last is None:self.last = connconn.next_upload = connself.try_send(True)else:conn.next_upload = self.last.next_uploadself.last.next_upload = connself.last = conndef try_send(self, check_time = False):t = clock()self.bytes_sent -= (t - self.lasttime) * self.upload_rateself.lasttime = tif check_time:self.bytes_sent = max(self.bytes_sent, 0)cur = self.last.next_uploadwhile self.bytes_sent <= 0:bytes = cur.send_partial(self.unitsize)self.bytes_sent += bytesself.measure.update_rate(bytes)if bytes == 0 or cur.backlogged():if self.last is cur:self.last = Nonecur.next_upload = Nonebreakelse:self.last.next_upload = cur.next_uploadcur.next_upload = Nonecur = self.last.next_uploadelse:self.last = curcur = cur.next_uploadelse:self.sched(self.try_send, self.bytes_sent / self.upload_rate)def adjust_sent(self, bytes):self.bytes_sent = min(self.bytes_sent+bytes, self.upload_rate*3)self.measure.update_rate(bytes)def ping(self, delay):if DEBUG:print delayif not self.autoadjust:returnself.pings.append(delay > PING_BOUNDARY)if len(self.pings) < PING_SAMPLES+PING_DISCARDS:returnif DEBUG:print 'cycle'pings = sum(self.pings[PING_DISCARDS:])del self.pings[:]if pings >= PING_THRESHHOLD: # assume floodedif self.upload_rate == MAX_RATE:self.upload_rate = self.measure.get_rate()*ADJUST_DOWNelse:self.upload_rate = min(self.upload_rate,self.measure.get_rate()*1.1)self.upload_rate = max(int(self.upload_rate*ADJUST_DOWN),2)self.slots = int(sqrt(self.upload_rate*SLOTS_FACTOR))self.slotsfunc(self.slots)if DEBUG:print 'adjust down to '+str(self.upload_rate)self.lasttime = clock()self.bytes_sent = 0self.autoadjustup = UP_DELAY_FIRSTelse: # not floodedif self.upload_rate == MAX_RATE:returnself.autoadjustup -= 1if self.autoadjustup:returnself.upload_rate = int(self.upload_rate*ADJUST_UP)self.slots = int(sqrt(self.upload_rate*SLOTS_FACTOR))self.slotsfunc(self.slots)if DEBUG:print 'adjust up to '+str(self.upload_rate)self.lasttime = clock()self.bytes_sent = 0self.autoadjustup = UP_DELAY_NEXT