Blame | Last modification | View Log | Download
# Written by Bram Cohen# see LICENSE.txt for license informationfrom BitTornado.CurrentRateMeasure import Measurefrom BitTornado.bitfield import Bitfieldfrom random import shufflefrom BitTornado.clock import clocktry:Trueexcept:True = 1False = 0EXPIRE_TIME = 60 * 60class PerIPStats:def __init__(self, ip):self.numgood = 0self.bad = {}self.numconnections = 0self.lastdownload = Noneself.peerid = Noneclass BadDataGuard:def __init__(self, download):self.download = downloadself.ip = download.ipself.downloader = download.downloaderself.stats = self.downloader.perip[self.ip]self.lastindex = Nonedef failed(self, index, bump = False):self.stats.bad.setdefault(index, 0)self.downloader.gotbaddata[self.ip] = 1self.stats.bad[index] += 1if len(self.stats.bad) > 1:if self.download is not None:self.downloader.try_kick(self.download)elif self.stats.numconnections == 1 and self.stats.lastdownload is not None:self.downloader.try_kick(self.stats.lastdownload)if len(self.stats.bad) >= 3 and len(self.stats.bad) > int(self.stats.numgood/30):self.downloader.try_ban(self.ip)elif bump:self.downloader.picker.bump(index)def good(self, index):# lastindex is a hack to only increase numgood by one for each good# piece, however many chunks come from the connection(s) from this IPif index != self.lastindex:self.stats.numgood += 1self.lastindex = indexclass SingleDownload:def __init__(self, downloader, connection):self.downloader = downloaderself.connection = connectionself.choked = Trueself.interested = Falseself.active_requests = []self.measure = Measure(downloader.max_rate_period)self.peermeasure = Measure(downloader.max_rate_period)self.have = Bitfield(downloader.numpieces)self.last = -1000self.last2 = -1000self.example_interest = Noneself.backlog = 2self.ip = connection.get_ip()self.guard = BadDataGuard(self)def _backlog(self, just_unchoked):self.backlog = min(2+int(4*self.measure.get_rate()/self.downloader.chunksize),(2*just_unchoked)+self.downloader.queue_limit() )if self.backlog > 50:self.backlog = max(50, self.backlog * 0.075)return self.backlogdef disconnected(self):self.downloader.lost_peer(self)if self.have.complete():self.downloader.picker.lost_seed()else:for i in xrange(len(self.have)):if self.have[i]:self.downloader.picker.lost_have(i)if self.have.complete() and self.downloader.storage.is_endgame():self.downloader.add_disconnected_seed(self.connection.get_readable_id())self._letgo()self.guard.download = Nonedef _letgo(self):if self.downloader.queued_out.has_key(self):del self.downloader.queued_out[self]if not self.active_requests:returnif self.downloader.endgamemode:self.active_requests = []returnlost = {}for index, begin, length in self.active_requests:self.downloader.storage.request_lost(index, begin, length)lost[index] = 1lost = lost.keys()self.active_requests = []if self.downloader.paused:returnds = [d for d in self.downloader.downloads if not d.choked]shuffle(ds)for d in ds:d._request_more()for d in self.downloader.downloads:if d.choked and not d.interested:for l in lost:if d.have[l] and self.downloader.storage.do_I_have_requests(l):d.send_interested()breakdef got_choke(self):if not self.choked:self.choked = Trueself._letgo()def got_unchoke(self):if self.choked:self.choked = Falseif self.interested:self._request_more(new_unchoke = True)self.last2 = clock()def is_choked(self):return self.chokeddef is_interested(self):return self.interesteddef send_interested(self):if not self.interested:self.interested = Trueself.connection.send_interested()if not self.choked:self.last2 = clock()def send_not_interested(self):if self.interested:self.interested = Falseself.connection.send_not_interested()def got_piece(self, index, begin, piece):length = len(piece)try:self.active_requests.remove((index, begin, length))except ValueError:self.downloader.discarded += lengthreturn Falseif self.downloader.endgamemode:self.downloader.all_requests.remove((index, begin, length))self.last = clock()self.last2 = clock()self.measure.update_rate(length)self.downloader.measurefunc(length)if not self.downloader.storage.piece_came_in(index, begin, piece, self.guard):self.downloader.piece_flunked(index)return Falseif self.downloader.storage.do_I_have(index):self.downloader.picker.complete(index)if self.downloader.endgamemode:for d in self.downloader.downloads:if d is not self:if d.interested:if d.choked:assert not d.active_requestsd.fix_download_endgame()else:try:d.active_requests.remove((index, begin, length))except ValueError:continued.connection.send_cancel(index, begin, length)d.fix_download_endgame()else:assert not d.active_requestsself._request_more()self.downloader.check_complete(index)return self.downloader.storage.do_I_have(index)def _request_more(self, new_unchoke = False):assert not self.chokedif self.downloader.endgamemode:self.fix_download_endgame(new_unchoke)returnif self.downloader.paused:returnif len(self.active_requests) >= self._backlog(new_unchoke):if not (self.active_requests or self.backlog):self.downloader.queued_out[self] = 1returnlost_interests = []while len(self.active_requests) < self.backlog:interest = self.downloader.picker.next(self.have,self.downloader.storage.do_I_have_requests,self.downloader.too_many_partials())if interest is None:breakself.example_interest = interestself.send_interested()loop = Truewhile len(self.active_requests) < self.backlog and loop:begin, length = self.downloader.storage.new_request(interest)self.downloader.picker.requested(interest)self.active_requests.append((interest, begin, length))self.connection.send_request(interest, begin, length)self.downloader.chunk_requested(length)if not self.downloader.storage.do_I_have_requests(interest):loop = Falselost_interests.append(interest)if not self.active_requests:self.send_not_interested()if lost_interests:for d in self.downloader.downloads:if d.active_requests or not d.interested:continueif d.example_interest is not None and self.downloader.storage.do_I_have_requests(d.example_interest):continuefor lost in lost_interests:if d.have[lost]:breakelse:continueinterest = self.downloader.picker.next(d.have,self.downloader.storage.do_I_have_requests,self.downloader.too_many_partials())if interest is None:d.send_not_interested()else:d.example_interest = interestif self.downloader.storage.is_endgame():self.downloader.start_endgame()def fix_download_endgame(self, new_unchoke = False):if self.downloader.paused:returnif len(self.active_requests) >= self._backlog(new_unchoke):if not (self.active_requests or self.backlog) and not self.choked:self.downloader.queued_out[self] = 1returnwant = [a for a in self.downloader.all_requests if self.have[a[0]] and a not in self.active_requests]if not (self.active_requests or want):self.send_not_interested()returnif want:self.send_interested()if self.choked:returnshuffle(want)del want[self.backlog - len(self.active_requests):]self.active_requests.extend(want)for piece, begin, length in want:self.connection.send_request(piece, begin, length)self.downloader.chunk_requested(length)def got_have(self, index):if index == self.downloader.numpieces-1:self.downloader.totalmeasure.update_rate(self.downloader.storage.total_length-(self.downloader.numpieces-1)*self.downloader.storage.piece_length)self.peermeasure.update_rate(self.downloader.storage.total_length-(self.downloader.numpieces-1)*self.downloader.storage.piece_length)else:self.downloader.totalmeasure.update_rate(self.downloader.storage.piece_length)self.peermeasure.update_rate(self.downloader.storage.piece_length)if not self.have[index]:self.have[index] = Trueself.downloader.picker.got_have(index)if self.have.complete():self.downloader.picker.became_seed()if self.downloader.storage.am_I_complete():self.downloader.add_disconnected_seed(self.connection.get_readable_id())self.connection.close()elif self.downloader.endgamemode:self.fix_download_endgame()elif ( not self.downloader.pausedand not self.downloader.picker.is_blocked(index)and self.downloader.storage.do_I_have_requests(index) ):if not self.choked:self._request_more()else:self.send_interested()return self.have.complete()def _check_interests(self):if self.interested or self.downloader.paused:returnfor i in xrange(len(self.have)):if ( self.have[i] and not self.downloader.picker.is_blocked(i)and ( self.downloader.endgamemodeor self.downloader.storage.do_I_have_requests(i) ) ):self.send_interested()returndef got_have_bitfield(self, have):if self.downloader.storage.am_I_complete() and have.complete():if self.downloader.super_seeding:self.connection.send_bitfield(have.tostring()) # be nice, show you're a seed tooself.connection.close()self.downloader.add_disconnected_seed(self.connection.get_readable_id())return Falseself.have = haveif have.complete():self.downloader.picker.got_seed()else:for i in xrange(len(have)):if have[i]:self.downloader.picker.got_have(i)if self.downloader.endgamemode and not self.downloader.paused:for piece, begin, length in self.downloader.all_requests:if self.have[piece]:self.send_interested()breakelse:self._check_interests()return have.complete()def get_rate(self):return self.measure.get_rate()def is_snubbed(self):if ( self.interested and not self.chokedand clock() - self.last2 > self.downloader.snub_time ):for index, begin, length in self.active_requests:self.connection.send_cancel(index, begin, length)self.got_choke() # treat it just like a chokereturn clock() - self.last > self.downloader.snub_timeclass Downloader:def __init__(self, storage, picker, backlog, max_rate_period,numpieces, chunksize, measurefunc, snub_time,kickbans_ok, kickfunc, banfunc):self.storage = storageself.picker = pickerself.backlog = backlogself.max_rate_period = max_rate_periodself.measurefunc = measurefuncself.totalmeasure = Measure(max_rate_period*storage.piece_length/storage.request_size)self.numpieces = numpiecesself.chunksize = chunksizeself.snub_time = snub_timeself.kickfunc = kickfuncself.banfunc = banfuncself.disconnectedseeds = {}self.downloads = []self.perip = {}self.gotbaddata = {}self.kicked = {}self.banned = {}self.kickbans_ok = kickbans_okself.kickbans_halted = Falseself.super_seeding = Falseself.endgamemode = Falseself.endgame_queued_pieces = []self.all_requests = []self.discarded = 0L# self.download_rate = 25000 # 25K/s test rateself.download_rate = 0self.bytes_requested = 0self.last_time = clock()self.queued_out = {}self.requeueing = Falseself.paused = Falsedef set_download_rate(self, rate):self.download_rate = rate * 1000self.bytes_requested = 0def queue_limit(self):if not self.download_rate:return 10e10 # that's a big queue!t = clock()self.bytes_requested -= (t - self.last_time) * self.download_rateself.last_time = tif not self.requeueing and self.queued_out and self.bytes_requested < 0:self.requeueing = Trueq = self.queued_out.keys()shuffle(q)self.queued_out = {}for d in q:d._request_more()self.requeueing = Falseif -self.bytes_requested > 5*self.download_rate:self.bytes_requested = -5*self.download_ratereturn max(int(-self.bytes_requested/self.chunksize),0)def chunk_requested(self, size):self.bytes_requested += sizeexternal_data_received = chunk_requesteddef make_download(self, connection):ip = connection.get_ip()if self.perip.has_key(ip):perip = self.perip[ip]else:perip = self.perip.setdefault(ip, PerIPStats(ip))perip.peerid = connection.get_readable_id()perip.numconnections += 1d = SingleDownload(self, connection)perip.lastdownload = dself.downloads.append(d)return ddef piece_flunked(self, index):if self.paused:returnif self.endgamemode:if self.downloads:while self.storage.do_I_have_requests(index):nb, nl = self.storage.new_request(index)self.all_requests.append((index, nb, nl))for d in self.downloads:d.fix_download_endgame()returnself._reset_endgame()returnds = [d for d in self.downloads if not d.choked]shuffle(ds)for d in ds:d._request_more()ds = [d for d in self.downloads if not d.interested and d.have[index]]for d in ds:d.example_interest = indexd.send_interested()def has_downloaders(self):return len(self.downloads)def lost_peer(self, download):ip = download.ipself.perip[ip].numconnections -= 1if self.perip[ip].lastdownload == download:self.perip[ip].lastdownload = Noneself.downloads.remove(download)if self.endgamemode and not self.downloads: # all peers goneself._reset_endgame()def _reset_endgame(self):self.storage.reset_endgame(self.all_requests)self.endgamemode = Falseself.all_requests = []self.endgame_queued_pieces = []def add_disconnected_seed(self, id):# if not self.disconnectedseeds.has_key(id):# self.picker.seed_seen_recently()self.disconnectedseeds[id]=clock()# def expire_disconnected_seeds(self):def num_disconnected_seeds(self):# first expire old onesexpired = []for id,t in self.disconnectedseeds.items():if clock() - t > EXPIRE_TIME: #Expire old seeds after so longexpired.append(id)for id in expired:# self.picker.seed_disappeared()del self.disconnectedseeds[id]return len(self.disconnectedseeds)# if this isn't called by a stats-gathering function# it should be scheduled to run every minute or two.def _check_kicks_ok(self):if len(self.gotbaddata) > 10:self.kickbans_ok = Falseself.kickbans_halted = Truereturn self.kickbans_ok and len(self.downloads) > 2def try_kick(self, download):if self._check_kicks_ok():download.guard.download = Noneip = download.ipid = download.connection.get_readable_id()self.kicked[ip] = idself.perip[ip].peerid = idself.kickfunc(download.connection)def try_ban(self, ip):if self._check_kicks_ok():self.banfunc(ip)self.banned[ip] = self.perip[ip].peeridif self.kicked.has_key(ip):del self.kicked[ip]def set_super_seed(self):self.super_seeding = Truedef check_complete(self, index):if self.endgamemode and not self.all_requests:self.endgamemode = Falseif self.endgame_queued_pieces and not self.endgamemode:self.requeue_piece_download()if self.storage.am_I_complete():assert not self.all_requestsassert not self.endgamemodefor d in [i for i in self.downloads if i.have.complete()]:d.connection.send_have(index) # be nice, tell the other seed you completedself.add_disconnected_seed(d.connection.get_readable_id())d.connection.close()return Truereturn Falsedef too_many_partials(self):return len(self.storage.dirty) > (len(self.downloads)/2)def cancel_piece_download(self, pieces):if self.endgamemode:if self.endgame_queued_pieces:for piece in pieces:try:self.endgame_queued_pieces.remove(piece)except:passnew_all_requests = []for index, nb, nl in self.all_requests:if index in pieces:self.storage.request_lost(index, nb, nl)else:new_all_requests.append((index, nb, nl))self.all_requests = new_all_requestsfor d in self.downloads:hit = Falsefor index, nb, nl in d.active_requests:if index in pieces:hit = Trued.connection.send_cancel(index, nb, nl)if not self.endgamemode:self.storage.request_lost(index, nb, nl)if hit:d.active_requests = [ r for r in d.active_requestsif r[0] not in pieces ]d._request_more()if not self.endgamemode and d.choked:d._check_interests()def requeue_piece_download(self, pieces = []):if self.endgame_queued_pieces:for piece in pieces:if not piece in self.endgame_queued_pieces:self.endgame_queued_pieces.append(piece)pieces = self.endgame_queued_piecesif self.endgamemode:if self.all_requests:self.endgame_queued_pieces = piecesreturnself.endgamemode = Falseself.endgame_queued_pieces = Noneds = [d for d in self.downloads]shuffle(ds)for d in ds:if d.choked:d._check_interests()else:d._request_more()def start_endgame(self):assert not self.endgamemodeself.endgamemode = Trueassert not self.all_requestsfor d in self.downloads:if d.active_requests:assert d.interested and not d.chokedfor request in d.active_requests:assert not request in self.all_requestsself.all_requests.append(request)for d in self.downloads:d.fix_download_endgame()def pause(self, flag):self.paused = flagif flag:for d in self.downloads:for index, begin, length in d.active_requests:d.connection.send_cancel(index, begin, length)d._letgo()d.send_not_interested()if self.endgamemode:self._reset_endgame()else:shuffle(self.downloads)for d in self.downloads:d._check_interests()if d.interested and not d.choked:d._request_more()