Subversion Repositories svnkaklik

Rev

Details | Last modification | View Log

Rev Author Line No. Line
36 kaklik 1
# Written by Bram Cohen
2
# modified for multitracker operation by John Hoffman
3
# see LICENSE.txt for license information
4
 
5
from BitTornado.zurllib import urlopen, quote
6
from urlparse import urlparse, urlunparse
7
from socket import gethostbyname
8
from btformats import check_peers
9
from BitTornado.bencode import bdecode
10
from threading import Thread, Lock
11
from cStringIO import StringIO
12
from traceback import print_exc
13
from socket import error, gethostbyname
14
from random import shuffle
15
from sha import sha
16
from time import time
17
try:
18
    from os import getpid
19
except ImportError:
20
    def getpid():
21
        return 1
22
 
23
try:
24
    True
25
except:
26
    True = 1
27
    False = 0
28
 
29
mapbase64 = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz.-'
30
keys = {}
31
basekeydata = str(getpid()) + repr(time()) + 'tracker'
32
 
33
def add_key(tracker):
34
    key = ''
35
    for i in sha(basekeydata+tracker).digest()[-6:]:
36
        key += mapbase64[ord(i) & 0x3F]
37
    keys[tracker] = key
38
 
39
def get_key(tracker):
40
    try:
41
        return "&key="+keys[tracker]
42
    except:
43
        add_key(tracker)
44
        return "&key="+keys[tracker]
45
 
46
class fakeflag:
47
    def __init__(self, state=False):
48
        self.state = state
49
    def wait(self):
50
        pass
51
    def isSet(self):
52
        return self.state
53
 
54
class Rerequester:
55
    def __init__(self, trackerlist, interval, sched, howmany, minpeers, 
56
            connect, externalsched, amount_left, up, down,
57
            port, ip, myid, infohash, timeout, errorfunc, excfunc,
58
            maxpeers, doneflag, upratefunc, downratefunc,
59
            unpauseflag = fakeflag(True),
60
            seed_id = '', seededfunc = None, force_rapid_update = False ):
61
 
62
        self.excfunc = excfunc
63
        newtrackerlist = []        
64
        for tier in trackerlist:
65
            if len(tier)>1:
66
                shuffle(tier)
67
            newtrackerlist += [tier]
68
        self.trackerlist = newtrackerlist
69
        self.lastsuccessful = ''
70
        self.rejectedmessage = 'rejected by tracker - '
71
 
72
        self.url = ('?info_hash=%s&peer_id=%s&port=%s' %
73
            (quote(infohash), quote(myid), str(port)))
74
        self.ip = ip
75
        self.interval = interval
76
        self.last = None
77
        self.trackerid = None
78
        self.announce_interval = 30 * 60
79
        self.sched = sched
80
        self.howmany = howmany
81
        self.minpeers = minpeers
82
        self.connect = connect
83
        self.externalsched = externalsched
84
        self.amount_left = amount_left
85
        self.up = up
86
        self.down = down
87
        self.timeout = timeout
88
        self.errorfunc = errorfunc
89
        self.maxpeers = maxpeers
90
        self.doneflag = doneflag
91
        self.upratefunc = upratefunc
92
        self.downratefunc = downratefunc
93
        self.unpauseflag = unpauseflag
94
        if seed_id:
95
            self.url += '&seed_id='+quote(seed_id)
96
        self.seededfunc = seededfunc
97
        if seededfunc:
98
            self.url += '&check_seeded=1'
99
        self.force_rapid_update = force_rapid_update
100
        self.last_failed = True
101
        self.never_succeeded = True
102
        self.errorcodes = {}
103
        self.lock = SuccessLock()
104
        self.special = None
105
        self.stopped = False
106
 
107
    def start(self):
108
        self.sched(self.c, self.interval/2)
109
        self.d(0)
110
 
111
    def c(self):
112
        if self.stopped:
113
            return
114
        if not self.unpauseflag.isSet() and (
115
            self.howmany() < self.minpeers or self.force_rapid_update ):
116
            self.announce(3, self._c)
117
        else:
118
            self._c()
119
 
120
    def _c(self):
121
        self.sched(self.c, self.interval)
122
 
123
    def d(self, event = 3):
124
        if self.stopped:
125
            return
126
        if not self.unpauseflag.isSet():
127
            self._d()
128
            return
129
        self.announce(event, self._d)
130
 
131
    def _d(self):
132
        if self.never_succeeded:
133
            self.sched(self.d, 60)  # retry in 60 seconds
134
        elif self.force_rapid_update:
135
            return
136
        else:
137
            self.sched(self.d, self.announce_interval)
138
 
139
 
140
    def hit(self, event = 3):
141
        if not self.unpauseflag.isSet() and (
142
            self.howmany() < self.minpeers or self.force_rapid_update ):
143
            self.announce(event)
144
 
145
    def announce(self, event = 3, callback = lambda: None, specialurl = None):
146
 
147
        if specialurl is not None:
148
            s = self.url+'&uploaded=0&downloaded=0&left=1'   # don't add to statistics
149
            if self.howmany() >= self.maxpeers:
150
                s += '&numwant=0'
151
            else:
152
                s += '&no_peer_id=1&compact=1'
153
            self.last_failed = True         # force true, so will display an error
154
            self.special = specialurl
155
            self.rerequest(s, callback)
156
            return
157
 
158
        else:
159
            s = ('%s&uploaded=%s&downloaded=%s&left=%s' %
160
                (self.url, str(self.up()), str(self.down()), 
161
                str(self.amount_left())))
162
        if self.last is not None:
163
            s += '&last=' + quote(str(self.last))
164
        if self.trackerid is not None:
165
            s += '&trackerid=' + quote(str(self.trackerid))
166
        if self.howmany() >= self.maxpeers:
167
            s += '&numwant=0'
168
        else:
169
            s += '&no_peer_id=1&compact=1'
170
        if event != 3:
171
            s += '&event=' + ['started', 'completed', 'stopped'][event]
172
        if event == 2:
173
            self.stopped = True
174
        self.rerequest(s, callback)
175
 
176
 
177
    def snoop(self, peers, callback = lambda: None):  # tracker call support
178
        self.rerequest(self.url
179
            +'&event=stopped&port=0&uploaded=0&downloaded=0&left=1&tracker=1&numwant='
180
            +str(peers), callback)
181
 
182
 
183
    def rerequest(self, s, callback):
184
        if not self.lock.isfinished():  # still waiting for prior cycle to complete??
185
            def retry(self = self, s = s, callback = callback):
186
                self.rerequest(s, callback)
187
            self.sched(retry,5)         # retry in 5 seconds
188
            return
189
        self.lock.reset()
190
        rq = Thread(target = self._rerequest, args = [s, callback])
191
        rq.setDaemon(False)
192
        rq.start()
193
 
194
    def _rerequest(self, s, callback):
195
        try:
196
            def fail (self = self, callback = callback):
197
                self._fail(callback)
198
            if self.ip:
199
                try:
200
                    s += '&ip=' + gethostbyname(self.ip)
201
                except:
202
                    self.errorcodes['troublecode'] = 'unable to resolve: '+self.ip
203
                    self.externalsched(fail)
204
            self.errorcodes = {}
205
            if self.special is None:
206
                for t in range(len(self.trackerlist)):
207
                    for tr in range(len(self.trackerlist[t])):
208
                        tracker  = self.trackerlist[t][tr]
209
                        if self.rerequest_single(tracker, s, callback):
210
                            if not self.last_failed and tr != 0:
211
                                del self.trackerlist[t][tr]
212
                                self.trackerlist[t] = [tracker] + self.trackerlist[t]
213
                            return
214
            else:
215
                tracker = self.special
216
                self.special = None
217
                if self.rerequest_single(tracker, s, callback):
218
                    return
219
            # no success from any tracker
220
            self.externalsched(fail)
221
        except:
222
            self.exception(callback)
223
 
224
 
225
    def _fail(self, callback):
226
        if ( (self.upratefunc() < 100 and self.downratefunc() < 100)
227
             or not self.amount_left() ):
228
            for f in ['rejected', 'bad_data', 'troublecode']:
229
                if self.errorcodes.has_key(f):
230
                    r = self.errorcodes[f]
231
                    break
232
            else:
233
                r = 'Problem connecting to tracker - unspecified error'
234
            self.errorfunc(r)
235
 
236
        self.last_failed = True
237
        self.lock.give_up()
238
        self.externalsched(callback)
239
 
240
 
241
    def rerequest_single(self, t, s, callback):
242
        l = self.lock.set()
243
        rq = Thread(target = self._rerequest_single, args = [t, s+get_key(t), l, callback])
244
        rq.setDaemon(False)
245
        rq.start()
246
        self.lock.wait()
247
        if self.lock.success:
248
            self.lastsuccessful = t
249
            self.last_failed = False
250
            self.never_succeeded = False
251
            return True
252
        if not self.last_failed and self.lastsuccessful == t:
253
            # if the last tracker hit was successful, and you've just tried the tracker
254
            # you'd contacted before, don't go any further, just fail silently.
255
            self.last_failed = True
256
            self.externalsched(callback)
257
            self.lock.give_up()
258
            return True
259
        return False    # returns true if it wants rerequest() to exit
260
 
261
 
262
    def _rerequest_single(self, t, s, l, callback):
263
        try:        
264
            closer = [None]
265
            def timedout(self = self, l = l, closer = closer):
266
                if self.lock.trip(l):
267
                    self.errorcodes['troublecode'] = 'Problem connecting to tracker - timeout exceeded'
268
                    self.lock.unwait(l)
269
                try:
270
                    closer[0]()
271
                except:
272
                    pass
273
 
274
            self.externalsched(timedout, self.timeout)
275
 
276
            err = None
277
            try:
278
                h = urlopen(t+s)
279
                closer[0] = h.close
280
                data = h.read()
281
            except (IOError, error), e:
282
                err = 'Problem connecting to tracker - ' + str(e)
283
            except:
284
                err = 'Problem connecting to tracker'
285
            try:
286
                h.close()
287
            except:
288
                pass
289
            if err:        
290
                if self.lock.trip(l):
291
                    self.errorcodes['troublecode'] = err
292
                    self.lock.unwait(l)
293
                return
294
 
295
            if data == '':
296
                if self.lock.trip(l):
297
                    self.errorcodes['troublecode'] = 'no data from tracker'
298
                    self.lock.unwait(l)
299
                return
300
 
301
            try:
302
                r = bdecode(data, sloppy=1)
303
                check_peers(r)
304
            except ValueError, e:
305
                if self.lock.trip(l):
306
                    self.errorcodes['bad_data'] = 'bad data from tracker - ' + str(e)
307
                    self.lock.unwait(l)
308
                return
309
 
310
            if r.has_key('failure reason'):
311
                if self.lock.trip(l):
312
                    self.errorcodes['rejected'] = self.rejectedmessage + r['failure reason']
313
                    self.lock.unwait(l)
314
                return
315
 
316
            if self.lock.trip(l, True):     # success!
317
                self.lock.unwait(l)
318
            else:
319
                callback = lambda: None     # attempt timed out, don't do a callback
320
 
321
            # even if the attempt timed out, go ahead and process data
322
            def add(self = self, r = r, callback = callback):
323
                self.postrequest(r, callback)
324
            self.externalsched(add)
325
        except:
326
            self.exception(callback)
327
 
328
 
329
    def postrequest(self, r, callback):
330
        if r.has_key('warning message'):
331
                self.errorfunc('warning from tracker - ' + r['warning message'])
332
        self.announce_interval = r.get('interval', self.announce_interval)
333
        self.interval = r.get('min interval', self.interval)
334
        self.trackerid = r.get('tracker id', self.trackerid)
335
        self.last = r.get('last')
336
#        ps = len(r['peers']) + self.howmany()
337
        p = r['peers']
338
        peers = []
339
        if type(p) == type(''):
340
            for x in xrange(0, len(p), 6):
341
                ip = '.'.join([str(ord(i)) for i in p[x:x+4]])
342
                port = (ord(p[x+4]) << 8) | ord(p[x+5])
343
                peers.append(((ip, port), 0))
344
        else:
345
            for x in p:
346
                peers.append(((x['ip'].strip(), x['port']), x.get('peer id',0)))
347
        ps = len(peers) + self.howmany()
348
        if ps < self.maxpeers:
349
            if self.doneflag.isSet():
350
                if r.get('num peers', 1000) - r.get('done peers', 0) > ps * 1.2:
351
                    self.last = None
352
            else:
353
                if r.get('num peers', 1000) > ps * 1.2:
354
                    self.last = None
355
        if self.seededfunc and r.get('seeded'):
356
            self.seededfunc()
357
        elif peers:
358
            shuffle(peers)
359
            self.connect(peers)
360
        callback()
361
 
362
    def exception(self, callback):
363
        data = StringIO()
364
        print_exc(file = data)
365
        def r(s = data.getvalue(), callback = callback):
366
            if self.excfunc:
367
                self.excfunc(s)
368
            else:
369
                print s
370
            callback()
371
        self.externalsched(r)
372
 
373
 
374
class SuccessLock:
375
    def __init__(self):
376
        self.lock = Lock()
377
        self.pause = Lock()
378
        self.code = 0L
379
        self.success = False
380
        self.finished = True
381
 
382
    def reset(self):
383
        self.success = False
384
        self.finished = False
385
 
386
    def set(self):
387
        self.lock.acquire()
388
        if not self.pause.locked():
389
            self.pause.acquire()
390
        self.first = True
391
        self.code += 1L
392
        self.lock.release()
393
        return self.code
394
 
395
    def trip(self, code, s = False):
396
        self.lock.acquire()
397
        try:
398
            if code == self.code and not self.finished:
399
                r = self.first
400
                self.first = False
401
                if s:
402
                    self.finished = True
403
                    self.success = True
404
                return r
405
        finally:
406
            self.lock.release()
407
 
408
    def give_up(self):
409
        self.lock.acquire()
410
        self.success = False
411
        self.finished = True
412
        self.lock.release()
413
 
414
    def wait(self):
415
        self.pause.acquire()
416
 
417
    def unwait(self, code):
418
        if code == self.code and self.pause.locked():
419
            self.pause.release()
420
 
421
    def isfinished(self):
422
        self.lock.acquire()
423
        x = self.finished
424
        self.lock.release()
425
        return x