Subversion Repositories svnkaklik

Rev

Details | Last modification | View Log

Rev Author Line No. Line
36 kaklik 1
# Written by Bram Cohen
2
# see LICENSE.txt for license information
3
 
4
from BitTornado.CurrentRateMeasure import Measure
5
from BitTornado.bitfield import Bitfield
6
from random import shuffle
7
from BitTornado.clock import clock
8
try:
9
    True
10
except:
11
    True = 1
12
    False = 0
13
 
14
EXPIRE_TIME = 60 * 60
15
 
16
class PerIPStats: 	 
17
    def __init__(self, ip):
18
        self.numgood = 0
19
        self.bad = {}
20
        self.numconnections = 0
21
        self.lastdownload = None
22
        self.peerid = None
23
 
24
class BadDataGuard:
25
    def __init__(self, download):
26
        self.download = download
27
        self.ip = download.ip
28
        self.downloader = download.downloader
29
        self.stats = self.downloader.perip[self.ip]
30
        self.lastindex = None
31
 
32
    def failed(self, index, bump = False):
33
        self.stats.bad.setdefault(index, 0)
34
        self.downloader.gotbaddata[self.ip] = 1
35
        self.stats.bad[index] += 1
36
        if len(self.stats.bad) > 1:
37
            if self.download is not None:
38
                self.downloader.try_kick(self.download)
39
            elif self.stats.numconnections == 1 and self.stats.lastdownload is not None:
40
                self.downloader.try_kick(self.stats.lastdownload)
41
        if len(self.stats.bad) >= 3 and len(self.stats.bad) > int(self.stats.numgood/30):
42
            self.downloader.try_ban(self.ip)
43
        elif bump:
44
            self.downloader.picker.bump(index)
45
 
46
    def good(self, index):
47
        # lastindex is a hack to only increase numgood by one for each good
48
        # piece, however many chunks come from the connection(s) from this IP
49
        if index != self.lastindex:
50
            self.stats.numgood += 1
51
            self.lastindex = index
52
 
53
class SingleDownload:
54
    def __init__(self, downloader, connection):
55
        self.downloader = downloader
56
        self.connection = connection
57
        self.choked = True
58
        self.interested = False
59
        self.active_requests = []
60
        self.measure = Measure(downloader.max_rate_period)
61
        self.peermeasure = Measure(downloader.max_rate_period)
62
        self.have = Bitfield(downloader.numpieces)
63
        self.last = -1000
64
        self.last2 = -1000
65
        self.example_interest = None
66
        self.backlog = 2
67
        self.ip = connection.get_ip()
68
        self.guard = BadDataGuard(self)
69
 
70
    def _backlog(self, just_unchoked):
71
        self.backlog = min(
72
            2+int(4*self.measure.get_rate()/self.downloader.chunksize),
73
            (2*just_unchoked)+self.downloader.queue_limit() )
74
        if self.backlog > 50:
75
            self.backlog = max(50, self.backlog * 0.075)
76
        return self.backlog
77
 
78
    def disconnected(self):
79
        self.downloader.lost_peer(self)
80
        if self.have.complete():
81
            self.downloader.picker.lost_seed()
82
        else:
83
            for i in xrange(len(self.have)):
84
                if self.have[i]:
85
                    self.downloader.picker.lost_have(i)
86
        if self.have.complete() and self.downloader.storage.is_endgame():
87
            self.downloader.add_disconnected_seed(self.connection.get_readable_id())
88
        self._letgo()
89
        self.guard.download = None
90
 
91
    def _letgo(self):
92
        if self.downloader.queued_out.has_key(self):
93
            del self.downloader.queued_out[self]
94
        if not self.active_requests:
95
            return
96
        if self.downloader.endgamemode:
97
            self.active_requests = []
98
            return
99
        lost = {}
100
        for index, begin, length in self.active_requests:
101
            self.downloader.storage.request_lost(index, begin, length)
102
            lost[index] = 1
103
        lost = lost.keys()
104
        self.active_requests = []
105
        if self.downloader.paused:
106
            return
107
        ds = [d for d in self.downloader.downloads if not d.choked]
108
        shuffle(ds)
109
        for d in ds:
110
            d._request_more()
111
        for d in self.downloader.downloads:
112
            if d.choked and not d.interested:
113
                for l in lost:
114
                    if d.have[l] and self.downloader.storage.do_I_have_requests(l):
115
                        d.send_interested()
116
                        break
117
 
118
    def got_choke(self):
119
        if not self.choked:
120
            self.choked = True
121
            self._letgo()
122
 
123
    def got_unchoke(self):
124
        if self.choked:
125
            self.choked = False
126
            if self.interested:
127
                self._request_more(new_unchoke = True)
128
            self.last2 = clock()
129
 
130
    def is_choked(self):
131
        return self.choked
132
 
133
    def is_interested(self):
134
        return self.interested
135
 
136
    def send_interested(self):
137
        if not self.interested:
138
            self.interested = True
139
            self.connection.send_interested()
140
            if not self.choked:
141
                self.last2 = clock()
142
 
143
    def send_not_interested(self):
144
        if self.interested:
145
            self.interested = False
146
            self.connection.send_not_interested()
147
 
148
    def got_piece(self, index, begin, piece):
149
        length = len(piece)
150
        try:
151
            self.active_requests.remove((index, begin, length))
152
        except ValueError:
153
            self.downloader.discarded += length
154
            return False
155
        if self.downloader.endgamemode:
156
            self.downloader.all_requests.remove((index, begin, length))
157
        self.last = clock()
158
        self.last2 = clock()
159
        self.measure.update_rate(length)
160
        self.downloader.measurefunc(length)
161
        if not self.downloader.storage.piece_came_in(index, begin, piece, self.guard):
162
            self.downloader.piece_flunked(index)
163
            return False
164
        if self.downloader.storage.do_I_have(index):
165
            self.downloader.picker.complete(index)
166
        if self.downloader.endgamemode:
167
            for d in self.downloader.downloads:
168
                if d is not self:
169
                  if d.interested:
170
                    if d.choked:
171
                        assert not d.active_requests
172
                        d.fix_download_endgame()
173
                    else:
174
                        try:
175
                            d.active_requests.remove((index, begin, length))
176
                        except ValueError:
177
                            continue
178
                        d.connection.send_cancel(index, begin, length)
179
                        d.fix_download_endgame()
180
                  else:
181
                      assert not d.active_requests
182
        self._request_more()
183
        self.downloader.check_complete(index)
184
        return self.downloader.storage.do_I_have(index)
185
 
186
    def _request_more(self, new_unchoke = False):
187
        assert not self.choked
188
        if self.downloader.endgamemode:
189
            self.fix_download_endgame(new_unchoke)
190
            return
191
        if self.downloader.paused:
192
            return
193
        if len(self.active_requests) >= self._backlog(new_unchoke):
194
            if not (self.active_requests or self.backlog):
195
                self.downloader.queued_out[self] = 1
196
            return
197
        lost_interests = []
198
        while len(self.active_requests) < self.backlog:
199
            interest = self.downloader.picker.next(self.have,
200
                               self.downloader.storage.do_I_have_requests,
201
                               self.downloader.too_many_partials())
202
            if interest is None:
203
                break
204
            self.example_interest = interest
205
            self.send_interested()
206
            loop = True
207
            while len(self.active_requests) < self.backlog and loop:
208
                begin, length = self.downloader.storage.new_request(interest)
209
                self.downloader.picker.requested(interest)
210
                self.active_requests.append((interest, begin, length))
211
                self.connection.send_request(interest, begin, length)
212
                self.downloader.chunk_requested(length)
213
                if not self.downloader.storage.do_I_have_requests(interest):
214
                    loop = False
215
                    lost_interests.append(interest)
216
        if not self.active_requests:
217
            self.send_not_interested()
218
        if lost_interests:
219
            for d in self.downloader.downloads:
220
                if d.active_requests or not d.interested:
221
                    continue
222
                if d.example_interest is not None and self.downloader.storage.do_I_have_requests(d.example_interest):
223
                    continue
224
                for lost in lost_interests:
225
                    if d.have[lost]:
226
                        break
227
                else:
228
                    continue
229
                interest = self.downloader.picker.next(d.have,
230
                                   self.downloader.storage.do_I_have_requests,
231
                                   self.downloader.too_many_partials())
232
                if interest is None:
233
                    d.send_not_interested()
234
                else:
235
                    d.example_interest = interest
236
        if self.downloader.storage.is_endgame():
237
            self.downloader.start_endgame()
238
 
239
 
240
    def fix_download_endgame(self, new_unchoke = False):
241
        if self.downloader.paused:
242
            return
243
        if len(self.active_requests) >= self._backlog(new_unchoke):
244
            if not (self.active_requests or self.backlog) and not self.choked:
245
                self.downloader.queued_out[self] = 1
246
            return
247
        want = [a for a in self.downloader.all_requests if self.have[a[0]] and a not in self.active_requests]
248
        if not (self.active_requests or want):
249
            self.send_not_interested()
250
            return
251
        if want:
252
            self.send_interested()
253
        if self.choked:
254
            return
255
        shuffle(want)
256
        del want[self.backlog - len(self.active_requests):]
257
        self.active_requests.extend(want)
258
        for piece, begin, length in want:
259
            self.connection.send_request(piece, begin, length)
260
            self.downloader.chunk_requested(length)
261
 
262
    def got_have(self, index):
263
        if index == self.downloader.numpieces-1:
264
            self.downloader.totalmeasure.update_rate(self.downloader.storage.total_length-(self.downloader.numpieces-1)*self.downloader.storage.piece_length)
265
            self.peermeasure.update_rate(self.downloader.storage.total_length-(self.downloader.numpieces-1)*self.downloader.storage.piece_length)
266
        else:
267
            self.downloader.totalmeasure.update_rate(self.downloader.storage.piece_length)
268
            self.peermeasure.update_rate(self.downloader.storage.piece_length)
269
        if not self.have[index]:
270
            self.have[index] = True
271
            self.downloader.picker.got_have(index)
272
            if self.have.complete():
273
                self.downloader.picker.became_seed()
274
                if self.downloader.storage.am_I_complete():
275
                    self.downloader.add_disconnected_seed(self.connection.get_readable_id())
276
                    self.connection.close()
277
            elif self.downloader.endgamemode:
278
                self.fix_download_endgame()
279
            elif ( not self.downloader.paused
280
                   and not self.downloader.picker.is_blocked(index)
281
                   and self.downloader.storage.do_I_have_requests(index) ):
282
                if not self.choked:
283
                    self._request_more()
284
                else:
285
                    self.send_interested()
286
        return self.have.complete()
287
 
288
    def _check_interests(self):
289
        if self.interested or self.downloader.paused:
290
            return
291
        for i in xrange(len(self.have)):
292
            if ( self.have[i] and not self.downloader.picker.is_blocked(i)
293
                 and ( self.downloader.endgamemode
294
                       or self.downloader.storage.do_I_have_requests(i) ) ):
295
                self.send_interested()
296
                return
297
 
298
    def got_have_bitfield(self, have):
299
        if self.downloader.storage.am_I_complete() and have.complete():
300
            if self.downloader.super_seeding:
301
                self.connection.send_bitfield(have.tostring()) # be nice, show you're a seed too
302
            self.connection.close()
303
            self.downloader.add_disconnected_seed(self.connection.get_readable_id())
304
            return False
305
        self.have = have
306
        if have.complete():
307
            self.downloader.picker.got_seed()
308
        else:
309
            for i in xrange(len(have)):
310
                if have[i]:
311
                    self.downloader.picker.got_have(i)
312
        if self.downloader.endgamemode and not self.downloader.paused:
313
            for piece, begin, length in self.downloader.all_requests:
314
                if self.have[piece]:
315
                    self.send_interested()
316
                    break
317
        else:
318
            self._check_interests()
319
        return have.complete()
320
 
321
    def get_rate(self):
322
        return self.measure.get_rate()
323
 
324
    def is_snubbed(self):
325
        if ( self.interested and not self.choked
326
             and clock() - self.last2 > self.downloader.snub_time ):
327
            for index, begin, length in self.active_requests:
328
                self.connection.send_cancel(index, begin, length)
329
            self.got_choke()    # treat it just like a choke
330
        return clock() - self.last > self.downloader.snub_time
331
 
332
 
333
class Downloader:
334
    def __init__(self, storage, picker, backlog, max_rate_period,
335
                 numpieces, chunksize, measurefunc, snub_time,
336
                 kickbans_ok, kickfunc, banfunc):
337
        self.storage = storage
338
        self.picker = picker
339
        self.backlog = backlog
340
        self.max_rate_period = max_rate_period
341
        self.measurefunc = measurefunc
342
        self.totalmeasure = Measure(max_rate_period*storage.piece_length/storage.request_size)
343
        self.numpieces = numpieces
344
        self.chunksize = chunksize
345
        self.snub_time = snub_time
346
        self.kickfunc = kickfunc
347
        self.banfunc = banfunc
348
        self.disconnectedseeds = {}
349
        self.downloads = []
350
        self.perip = {}
351
        self.gotbaddata = {}
352
        self.kicked = {}
353
        self.banned = {}
354
        self.kickbans_ok = kickbans_ok
355
        self.kickbans_halted = False
356
        self.super_seeding = False
357
        self.endgamemode = False
358
        self.endgame_queued_pieces = []
359
        self.all_requests = []
360
        self.discarded = 0L
361
#        self.download_rate = 25000  # 25K/s test rate
362
        self.download_rate = 0
363
        self.bytes_requested = 0
364
        self.last_time = clock()
365
        self.queued_out = {}
366
        self.requeueing = False
367
        self.paused = False
368
 
369
    def set_download_rate(self, rate):
370
        self.download_rate = rate * 1000
371
        self.bytes_requested = 0
372
 
373
    def queue_limit(self):
374
        if not self.download_rate:
375
            return 10e10    # that's a big queue!
376
        t = clock()
377
        self.bytes_requested -= (t - self.last_time) * self.download_rate
378
        self.last_time = t
379
        if not self.requeueing and self.queued_out and self.bytes_requested < 0:
380
            self.requeueing = True
381
            q = self.queued_out.keys()
382
            shuffle(q)
383
            self.queued_out = {}
384
            for d in q:
385
                d._request_more()
386
            self.requeueing = False
387
        if -self.bytes_requested > 5*self.download_rate:
388
            self.bytes_requested = -5*self.download_rate
389
        return max(int(-self.bytes_requested/self.chunksize),0)
390
 
391
    def chunk_requested(self, size):
392
        self.bytes_requested += size
393
 
394
    external_data_received = chunk_requested
395
 
396
    def make_download(self, connection):
397
        ip = connection.get_ip()
398
        if self.perip.has_key(ip):
399
            perip = self.perip[ip]
400
        else:
401
            perip = self.perip.setdefault(ip, PerIPStats(ip))
402
        perip.peerid = connection.get_readable_id()
403
        perip.numconnections += 1
404
        d = SingleDownload(self, connection)
405
        perip.lastdownload = d
406
        self.downloads.append(d)
407
        return d
408
 
409
    def piece_flunked(self, index):
410
        if self.paused:
411
            return
412
        if self.endgamemode:
413
            if self.downloads:
414
                while self.storage.do_I_have_requests(index):
415
                    nb, nl = self.storage.new_request(index)
416
                    self.all_requests.append((index, nb, nl))
417
                for d in self.downloads:
418
                    d.fix_download_endgame()
419
                return
420
            self._reset_endgame()
421
            return
422
        ds = [d for d in self.downloads if not d.choked]
423
        shuffle(ds)
424
        for d in ds:
425
            d._request_more()
426
        ds = [d for d in self.downloads if not d.interested and d.have[index]]
427
        for d in ds:
428
            d.example_interest = index
429
            d.send_interested()
430
 
431
    def has_downloaders(self):
432
        return len(self.downloads)
433
 
434
    def lost_peer(self, download):
435
        ip = download.ip
436
        self.perip[ip].numconnections -= 1
437
        if self.perip[ip].lastdownload == download:
438
            self.perip[ip].lastdownload = None
439
        self.downloads.remove(download)
440
        if self.endgamemode and not self.downloads: # all peers gone
441
            self._reset_endgame()
442
 
443
    def _reset_endgame(self):            
444
        self.storage.reset_endgame(self.all_requests)
445
        self.endgamemode = False
446
        self.all_requests = []
447
        self.endgame_queued_pieces = []
448
 
449
 
450
    def add_disconnected_seed(self, id):
451
#        if not self.disconnectedseeds.has_key(id):
452
#            self.picker.seed_seen_recently()
453
        self.disconnectedseeds[id]=clock()
454
 
455
#	def expire_disconnected_seeds(self):
456
 
457
    def num_disconnected_seeds(self):
458
        # first expire old ones
459
        expired = []
460
        for id,t in self.disconnectedseeds.items():
461
            if clock() - t > EXPIRE_TIME:     #Expire old seeds after so long
462
                expired.append(id)
463
        for id in expired:
464
#            self.picker.seed_disappeared()
465
            del self.disconnectedseeds[id]
466
        return len(self.disconnectedseeds)
467
        # if this isn't called by a stats-gathering function
468
        # it should be scheduled to run every minute or two.
469
 
470
    def _check_kicks_ok(self):
471
        if len(self.gotbaddata) > 10:
472
            self.kickbans_ok = False
473
            self.kickbans_halted = True
474
        return self.kickbans_ok and len(self.downloads) > 2
475
 
476
    def try_kick(self, download):
477
        if self._check_kicks_ok():
478
            download.guard.download = None
479
            ip = download.ip
480
            id = download.connection.get_readable_id()
481
            self.kicked[ip] = id
482
            self.perip[ip].peerid = id
483
            self.kickfunc(download.connection)
484
 
485
    def try_ban(self, ip):
486
        if self._check_kicks_ok():
487
            self.banfunc(ip)
488
            self.banned[ip] = self.perip[ip].peerid
489
            if self.kicked.has_key(ip):
490
                del self.kicked[ip]
491
 
492
    def set_super_seed(self):
493
        self.super_seeding = True
494
 
495
    def check_complete(self, index):
496
        if self.endgamemode and not self.all_requests:
497
            self.endgamemode = False
498
        if self.endgame_queued_pieces and not self.endgamemode:
499
            self.requeue_piece_download()
500
        if self.storage.am_I_complete():
501
            assert not self.all_requests
502
            assert not self.endgamemode
503
            for d in [i for i in self.downloads if i.have.complete()]:
504
                d.connection.send_have(index)   # be nice, tell the other seed you completed
505
                self.add_disconnected_seed(d.connection.get_readable_id())
506
                d.connection.close()
507
            return True
508
        return False
509
 
510
    def too_many_partials(self):
511
        return len(self.storage.dirty) > (len(self.downloads)/2)
512
 
513
 
514
    def cancel_piece_download(self, pieces):
515
        if self.endgamemode:
516
            if self.endgame_queued_pieces:
517
                for piece in pieces:
518
                    try:
519
                        self.endgame_queued_pieces.remove(piece)
520
                    except:
521
                        pass
522
            new_all_requests = []
523
            for index, nb, nl in self.all_requests:
524
                if index in pieces:
525
                    self.storage.request_lost(index, nb, nl)
526
                else:
527
                    new_all_requests.append((index, nb, nl))
528
            self.all_requests = new_all_requests
529
 
530
        for d in self.downloads:
531
            hit = False
532
            for index, nb, nl in d.active_requests:
533
                if index in pieces:
534
                    hit = True
535
                    d.connection.send_cancel(index, nb, nl)
536
                    if not self.endgamemode:
537
                        self.storage.request_lost(index, nb, nl)
538
            if hit:
539
                d.active_requests = [ r for r in d.active_requests
540
                                      if r[0] not in pieces ]
541
                d._request_more()
542
            if not self.endgamemode and d.choked:
543
                d._check_interests()
544
 
545
    def requeue_piece_download(self, pieces = []):
546
        if self.endgame_queued_pieces:
547
            for piece in pieces:
548
                if not piece in self.endgame_queued_pieces:
549
                    self.endgame_queued_pieces.append(piece)
550
            pieces = self.endgame_queued_pieces
551
        if self.endgamemode:
552
            if self.all_requests:
553
                self.endgame_queued_pieces = pieces
554
                return
555
            self.endgamemode = False
556
            self.endgame_queued_pieces = None
557
 
558
        ds = [d for d in self.downloads]
559
        shuffle(ds)
560
        for d in ds:
561
            if d.choked:
562
                d._check_interests()
563
            else:
564
                d._request_more()
565
 
566
    def start_endgame(self):
567
        assert not self.endgamemode
568
        self.endgamemode = True
569
        assert not self.all_requests
570
        for d in self.downloads:
571
            if d.active_requests:
572
                assert d.interested and not d.choked
573
            for request in d.active_requests:
574
                assert not request in self.all_requests
575
                self.all_requests.append(request)
576
        for d in self.downloads:
577
            d.fix_download_endgame()
578
 
579
    def pause(self, flag):
580
        self.paused = flag
581
        if flag:
582
            for d in self.downloads:
583
                for index, begin, length in d.active_requests:
584
                    d.connection.send_cancel(index, begin, length)
585
                d._letgo()
586
                d.send_not_interested()
587
            if self.endgamemode:
588
                self._reset_endgame()
589
        else:
590
            shuffle(self.downloads)
591
            for d in self.downloads:
592
                d._check_interests()
593
                if d.interested and not d.choked:
594
                    d._request_more()