Subversion Repositories svnkaklik

Compare Revisions

Ignore whitespace Rev 35 → Rev 36

/web/kaklik's_web/torrentflux/TF_BitTornado/BitTornado/natpunch.py
0,0 → 1,254
# Written by John Hoffman
# derived from NATPortMapping.py by Yejun Yang
# and from example code by Myers Carpenter
# see LICENSE.txt for license information
 
import socket
from traceback import print_exc
from subnetparse import IP_List
from clock import clock
from __init__ import createPeerID
try:
True
except:
True = 1
False = 0
 
DEBUG = False
 
EXPIRE_CACHE = 30 # seconds
ID = "BT-"+createPeerID()[-4:]
 
try:
import pythoncom, win32com.client
_supported = 1
except ImportError:
_supported = 0
 
 
 
class _UPnP1: # derived from Myers Carpenter's code
# seems to use the machine's local UPnP
# system for its operation. Runs fairly fast
 
def __init__(self):
self.map = None
self.last_got_map = -10e10
 
def _get_map(self):
if self.last_got_map + EXPIRE_CACHE < clock():
try:
dispatcher = win32com.client.Dispatch("HNetCfg.NATUPnP")
self.map = dispatcher.StaticPortMappingCollection
self.last_got_map = clock()
except:
self.map = None
return self.map
 
def test(self):
try:
assert self._get_map() # make sure a map was found
success = True
except:
success = False
return success
 
 
def open(self, ip, p):
map = self._get_map()
try:
map.Add(p,'TCP',p,ip,True,ID)
if DEBUG:
print 'port opened: '+ip+':'+str(p)
success = True
except:
if DEBUG:
print "COULDN'T OPEN "+str(p)
print_exc()
success = False
return success
 
 
def close(self, p):
map = self._get_map()
try:
map.Remove(p,'TCP')
success = True
if DEBUG:
print 'port closed: '+str(p)
except:
if DEBUG:
print 'ERROR CLOSING '+str(p)
print_exc()
success = False
return success
 
 
def clean(self, retry = False):
if not _supported:
return
try:
map = self._get_map()
ports_in_use = []
for i in xrange(len(map)):
try:
mapping = map[i]
port = mapping.ExternalPort
prot = str(mapping.Protocol).lower()
desc = str(mapping.Description).lower()
except:
port = None
if port and prot == 'tcp' and desc[:3] == 'bt-':
ports_in_use.append(port)
success = True
for port in ports_in_use:
try:
map.Remove(port,'TCP')
except:
success = False
if not success and not retry:
self.clean(retry = True)
except:
pass
 
 
class _UPnP2: # derived from Yejun Yang's code
# apparently does a direct search for UPnP hardware
# may work in some cases where _UPnP1 won't, but is slow
# still need to implement "clean" method
 
def __init__(self):
self.services = None
self.last_got_services = -10e10
def _get_services(self):
if not self.services or self.last_got_services + EXPIRE_CACHE < clock():
self.services = []
try:
f=win32com.client.Dispatch("UPnP.UPnPDeviceFinder")
for t in ( "urn:schemas-upnp-org:service:WANIPConnection:1",
"urn:schemas-upnp-org:service:WANPPPConnection:1" ):
try:
conns = f.FindByType(t,0)
for c in xrange(len(conns)):
try:
svcs = conns[c].Services
for s in xrange(len(svcs)):
try:
self.services.append(svcs[s])
except:
pass
except:
pass
except:
pass
except:
pass
self.last_got_services = clock()
return self.services
 
def test(self):
try:
assert self._get_services() # make sure some services can be found
success = True
except:
success = False
return success
 
 
def open(self, ip, p):
svcs = self._get_services()
success = False
for s in svcs:
try:
s.InvokeAction('AddPortMapping',['',p,'TCP',p,ip,True,ID,0],'')
success = True
except:
pass
if DEBUG and not success:
print "COULDN'T OPEN "+str(p)
print_exc()
return success
 
 
def close(self, p):
svcs = self._get_services()
success = False
for s in svcs:
try:
s.InvokeAction('DeletePortMapping', ['',p,'TCP'], '')
success = True
except:
pass
if DEBUG and not success:
print "COULDN'T OPEN "+str(p)
print_exc()
return success
 
 
class _UPnP: # master holding class
def __init__(self):
self.upnp1 = _UPnP1()
self.upnp2 = _UPnP2()
self.upnplist = (None, self.upnp1, self.upnp2)
self.upnp = None
self.local_ip = None
self.last_got_ip = -10e10
def get_ip(self):
if self.last_got_ip + EXPIRE_CACHE < clock():
local_ips = IP_List()
local_ips.set_intranet_addresses()
try:
for info in socket.getaddrinfo(socket.gethostname(),0,socket.AF_INET):
# exception if socket library isn't recent
self.local_ip = info[4][0]
if local_ips.includes(self.local_ip):
self.last_got_ip = clock()
if DEBUG:
print 'Local IP found: '+self.local_ip
break
else:
raise ValueError('couldn\'t find intranet IP')
except:
self.local_ip = None
if DEBUG:
print 'Error finding local IP'
print_exc()
return self.local_ip
 
def test(self, upnp_type):
if DEBUG:
print 'testing UPnP type '+str(upnp_type)
if not upnp_type or not _supported or self.get_ip() is None:
if DEBUG:
print 'not supported'
return 0
pythoncom.CoInitialize() # leave initialized
self.upnp = self.upnplist[upnp_type] # cache this
if self.upnp.test():
if DEBUG:
print 'ok'
return upnp_type
if DEBUG:
print 'tested bad'
return 0
 
def open(self, p):
assert self.upnp, "must run UPnP_test() with the desired UPnP access type first"
return self.upnp.open(self.get_ip(), p)
 
def close(self, p):
assert self.upnp, "must run UPnP_test() with the desired UPnP access type first"
return self.upnp.close(p)
 
def clean(self):
return self.upnp1.clean()
 
_upnp_ = _UPnP()
 
UPnP_test = _upnp_.test
UPnP_open_port = _upnp_.open
UPnP_close_port = _upnp_.close
UPnP_reset = _upnp_.clean