Subversion Repositories svnkaklik

Rev

Details | Last modification | View Log

Rev Author Line No. Line
36 kaklik 1
# Written by Bram Cohen
2
# see LICENSE.txt for license information
3
 
4
from BitTornado.bitfield import Bitfield
5
from BitTornado.clock import clock
6
from binascii import b2a_hex
7
 
8
try:
9
    True
10
except:
11
    True = 1
12
    False = 0
13
 
14
DEBUG = False
15
 
16
def toint(s):
17
    return long(b2a_hex(s), 16)
18
 
19
def tobinary(i):
20
    return (chr(i >> 24) + chr((i >> 16) & 0xFF) + 
21
        chr((i >> 8) & 0xFF) + chr(i & 0xFF))
22
 
23
CHOKE = chr(0)
24
UNCHOKE = chr(1)
25
INTERESTED = chr(2)
26
NOT_INTERESTED = chr(3)
27
# index
28
HAVE = chr(4)
29
# index, bitfield
30
BITFIELD = chr(5)
31
# index, begin, length
32
REQUEST = chr(6)
33
# index, begin, piece
34
PIECE = chr(7)
35
# index, begin, piece
36
CANCEL = chr(8)
37
 
38
class Connection:
39
    def __init__(self, connection, connecter):
40
        self.connection = connection
41
        self.connecter = connecter
42
        self.got_anything = False
43
        self.next_upload = None
44
        self.outqueue = []
45
        self.partial_message = None
46
        self.download = None
47
        self.send_choke_queued = False
48
        self.just_unchoked = None
49
 
50
    def get_ip(self, real=False):
51
        return self.connection.get_ip(real)
52
 
53
    def get_id(self):
54
        return self.connection.get_id()
55
 
56
    def get_readable_id(self):
57
        return self.connection.get_readable_id()
58
 
59
    def close(self):
60
        if DEBUG:
61
            print 'connection closed'
62
        self.connection.close()
63
 
64
    def is_locally_initiated(self):
65
        return self.connection.is_locally_initiated()
66
 
67
    def send_interested(self):
68
        self._send_message(INTERESTED)
69
 
70
    def send_not_interested(self):
71
        self._send_message(NOT_INTERESTED)
72
 
73
    def send_choke(self):
74
        if self.partial_message:
75
            self.send_choke_queued = True
76
        else:
77
            self._send_message(CHOKE)
78
            self.upload.choke_sent()
79
            self.just_unchoked = 0
80
 
81
    def send_unchoke(self):
82
        if self.send_choke_queued:
83
            self.send_choke_queued = False
84
            if DEBUG:
85
                print 'CHOKE SUPPRESSED'
86
        else:
87
            self._send_message(UNCHOKE)
88
            if ( self.partial_message or self.just_unchoked is None
89
                 or not self.upload.interested or self.download.active_requests ):
90
                self.just_unchoked = 0
91
            else:
92
                self.just_unchoked = clock()
93
 
94
    def send_request(self, index, begin, length):
95
        self._send_message(REQUEST + tobinary(index) + 
96
            tobinary(begin) + tobinary(length))
97
        if DEBUG:
98
            print 'sent request: '+str(index)+': '+str(begin)+'-'+str(begin+length)
99
 
100
    def send_cancel(self, index, begin, length):
101
        self._send_message(CANCEL + tobinary(index) + 
102
            tobinary(begin) + tobinary(length))
103
        if DEBUG:
104
            print 'sent cancel: '+str(index)+': '+str(begin)+'-'+str(begin+length)
105
 
106
    def send_bitfield(self, bitfield):
107
        self._send_message(BITFIELD + bitfield)
108
 
109
    def send_have(self, index):
110
        self._send_message(HAVE + tobinary(index))
111
 
112
    def send_keepalive(self):
113
        self._send_message('')
114
 
115
    def _send_message(self, s):
116
        s = tobinary(len(s))+s
117
        if self.partial_message:
118
            self.outqueue.append(s)
119
        else:
120
            self.connection.send_message_raw(s)
121
 
122
    def send_partial(self, bytes):
123
        if self.connection.closed:
124
            return 0
125
        if self.partial_message is None:
126
            s = self.upload.get_upload_chunk()
127
            if s is None:
128
                return 0
129
            index, begin, piece = s
130
            self.partial_message = ''.join((
131
                            tobinary(len(piece) + 9), PIECE,
132
                            tobinary(index), tobinary(begin), piece.tostring() ))
133
            if DEBUG:
134
                print 'sending chunk: '+str(index)+': '+str(begin)+'-'+str(begin+len(piece))
135
 
136
        if bytes < len(self.partial_message):
137
            self.connection.send_message_raw(self.partial_message[:bytes])
138
            self.partial_message = self.partial_message[bytes:]
139
            return bytes
140
 
141
        q = [self.partial_message]
142
        self.partial_message = None
143
        if self.send_choke_queued:
144
            self.send_choke_queued = False
145
            self.outqueue.append(tobinary(1)+CHOKE)
146
            self.upload.choke_sent()
147
            self.just_unchoked = 0
148
        q.extend(self.outqueue)
149
        self.outqueue = []
150
        q = ''.join(q)
151
        self.connection.send_message_raw(q)
152
        return len(q)
153
 
154
    def get_upload(self):
155
        return self.upload
156
 
157
    def get_download(self):
158
        return self.download
159
 
160
    def set_download(self, download):
161
        self.download = download
162
 
163
    def backlogged(self):
164
        return not self.connection.is_flushed()
165
 
166
    def got_request(self, i, p, l):
167
        self.upload.got_request(i, p, l)
168
        if self.just_unchoked:
169
            self.connecter.ratelimiter.ping(clock() - self.just_unchoked)
170
            self.just_unchoked = 0
171
 
172
 
173
 
174
 
175
class Connecter:
176
    def __init__(self, make_upload, downloader, choker, numpieces,
177
            totalup, config, ratelimiter, sched = None):
178
        self.downloader = downloader
179
        self.make_upload = make_upload
180
        self.choker = choker
181
        self.numpieces = numpieces
182
        self.config = config
183
        self.ratelimiter = ratelimiter
184
        self.rate_capped = False
185
        self.sched = sched
186
        self.totalup = totalup
187
        self.rate_capped = False
188
        self.connections = {}
189
        self.external_connection_made = 0
190
 
191
    def how_many_connections(self):
192
        return len(self.connections)
193
 
194
    def connection_made(self, connection):
195
        c = Connection(connection, self)
196
        self.connections[connection] = c
197
        c.upload = self.make_upload(c, self.ratelimiter, self.totalup)
198
        c.download = self.downloader.make_download(c)
199
        self.choker.connection_made(c)
200
        return c
201
 
202
    def connection_lost(self, connection):
203
        c = self.connections[connection]
204
        del self.connections[connection]
205
        if c.download:
206
            c.download.disconnected()
207
        self.choker.connection_lost(c)
208
 
209
    def connection_flushed(self, connection):
210
        conn = self.connections[connection]
211
        if conn.next_upload is None and (conn.partial_message is not None
212
               or len(conn.upload.buffer) > 0):
213
            self.ratelimiter.queue(conn)
214
 
215
    def got_piece(self, i):
216
        for co in self.connections.values():
217
            co.send_have(i)
218
 
219
    def got_message(self, connection, message):
220
        c = self.connections[connection]
221
        t = message[0]
222
        if t == BITFIELD and c.got_anything:
223
            connection.close()
224
            return
225
        c.got_anything = True
226
        if (t in [CHOKE, UNCHOKE, INTERESTED, NOT_INTERESTED] and 
227
                len(message) != 1):
228
            connection.close()
229
            return
230
        if t == CHOKE:
231
            c.download.got_choke()
232
        elif t == UNCHOKE:
233
            c.download.got_unchoke()
234
        elif t == INTERESTED:
235
            if not c.download.have.complete():
236
                c.upload.got_interested()
237
        elif t == NOT_INTERESTED:
238
            c.upload.got_not_interested()
239
        elif t == HAVE:
240
            if len(message) != 5:
241
                connection.close()
242
                return
243
            i = toint(message[1:])
244
            if i >= self.numpieces:
245
                connection.close()
246
                return
247
            if c.download.got_have(i):
248
                c.upload.got_not_interested()
249
        elif t == BITFIELD:
250
            try:
251
                b = Bitfield(self.numpieces, message[1:])
252
            except ValueError:
253
                connection.close()
254
                return
255
            if c.download.got_have_bitfield(b):
256
                c.upload.got_not_interested()
257
        elif t == REQUEST:
258
            if len(message) != 13:
259
                connection.close()
260
                return
261
            i = toint(message[1:5])
262
            if i >= self.numpieces:
263
                connection.close()
264
                return
265
            c.got_request(i, toint(message[5:9]), 
266
                toint(message[9:]))
267
        elif t == CANCEL:
268
            if len(message) != 13:
269
                connection.close()
270
                return
271
            i = toint(message[1:5])
272
            if i >= self.numpieces:
273
                connection.close()
274
                return
275
            c.upload.got_cancel(i, toint(message[5:9]), 
276
                toint(message[9:]))
277
        elif t == PIECE:
278
            if len(message) <= 9:
279
                connection.close()
280
                return
281
            i = toint(message[1:5])
282
            if i >= self.numpieces:
283
                connection.close()
284
                return
285
            if c.download.got_piece(i, toint(message[5:9]), message[9:]):
286
                self.got_piece(i)
287
        else:
288
            connection.close()