0,0 → 1,254 |
# Written by John Hoffman |
# derived from NATPortMapping.py by Yejun Yang |
# and from example code by Myers Carpenter |
# see LICENSE.txt for license information |
|
import socket |
from traceback import print_exc |
from subnetparse import IP_List |
from clock import clock |
from __init__ import createPeerID |
try: |
True |
except: |
True = 1 |
False = 0 |
|
DEBUG = False |
|
EXPIRE_CACHE = 30 # seconds |
ID = "BT-"+createPeerID()[-4:] |
|
try: |
import pythoncom, win32com.client |
_supported = 1 |
except ImportError: |
_supported = 0 |
|
|
|
class _UPnP1: # derived from Myers Carpenter's code |
# seems to use the machine's local UPnP |
# system for its operation. Runs fairly fast |
|
def __init__(self): |
self.map = None |
self.last_got_map = -10e10 |
|
def _get_map(self): |
if self.last_got_map + EXPIRE_CACHE < clock(): |
try: |
dispatcher = win32com.client.Dispatch("HNetCfg.NATUPnP") |
self.map = dispatcher.StaticPortMappingCollection |
self.last_got_map = clock() |
except: |
self.map = None |
return self.map |
|
def test(self): |
try: |
assert self._get_map() # make sure a map was found |
success = True |
except: |
success = False |
return success |
|
|
def open(self, ip, p): |
map = self._get_map() |
try: |
map.Add(p,'TCP',p,ip,True,ID) |
if DEBUG: |
print 'port opened: '+ip+':'+str(p) |
success = True |
except: |
if DEBUG: |
print "COULDN'T OPEN "+str(p) |
print_exc() |
success = False |
return success |
|
|
def close(self, p): |
map = self._get_map() |
try: |
map.Remove(p,'TCP') |
success = True |
if DEBUG: |
print 'port closed: '+str(p) |
except: |
if DEBUG: |
print 'ERROR CLOSING '+str(p) |
print_exc() |
success = False |
return success |
|
|
def clean(self, retry = False): |
if not _supported: |
return |
try: |
map = self._get_map() |
ports_in_use = [] |
for i in xrange(len(map)): |
try: |
mapping = map[i] |
port = mapping.ExternalPort |
prot = str(mapping.Protocol).lower() |
desc = str(mapping.Description).lower() |
except: |
port = None |
if port and prot == 'tcp' and desc[:3] == 'bt-': |
ports_in_use.append(port) |
success = True |
for port in ports_in_use: |
try: |
map.Remove(port,'TCP') |
except: |
success = False |
if not success and not retry: |
self.clean(retry = True) |
except: |
pass |
|
|
class _UPnP2: # derived from Yejun Yang's code |
# apparently does a direct search for UPnP hardware |
# may work in some cases where _UPnP1 won't, but is slow |
# still need to implement "clean" method |
|
def __init__(self): |
self.services = None |
self.last_got_services = -10e10 |
|
def _get_services(self): |
if not self.services or self.last_got_services + EXPIRE_CACHE < clock(): |
self.services = [] |
try: |
f=win32com.client.Dispatch("UPnP.UPnPDeviceFinder") |
for t in ( "urn:schemas-upnp-org:service:WANIPConnection:1", |
"urn:schemas-upnp-org:service:WANPPPConnection:1" ): |
try: |
conns = f.FindByType(t,0) |
for c in xrange(len(conns)): |
try: |
svcs = conns[c].Services |
for s in xrange(len(svcs)): |
try: |
self.services.append(svcs[s]) |
except: |
pass |
except: |
pass |
except: |
pass |
except: |
pass |
self.last_got_services = clock() |
return self.services |
|
def test(self): |
try: |
assert self._get_services() # make sure some services can be found |
success = True |
except: |
success = False |
return success |
|
|
def open(self, ip, p): |
svcs = self._get_services() |
success = False |
for s in svcs: |
try: |
s.InvokeAction('AddPortMapping',['',p,'TCP',p,ip,True,ID,0],'') |
success = True |
except: |
pass |
if DEBUG and not success: |
print "COULDN'T OPEN "+str(p) |
print_exc() |
return success |
|
|
def close(self, p): |
svcs = self._get_services() |
success = False |
for s in svcs: |
try: |
s.InvokeAction('DeletePortMapping', ['',p,'TCP'], '') |
success = True |
except: |
pass |
if DEBUG and not success: |
print "COULDN'T OPEN "+str(p) |
print_exc() |
return success |
|
|
class _UPnP: # master holding class |
def __init__(self): |
self.upnp1 = _UPnP1() |
self.upnp2 = _UPnP2() |
self.upnplist = (None, self.upnp1, self.upnp2) |
self.upnp = None |
self.local_ip = None |
self.last_got_ip = -10e10 |
|
def get_ip(self): |
if self.last_got_ip + EXPIRE_CACHE < clock(): |
local_ips = IP_List() |
local_ips.set_intranet_addresses() |
try: |
for info in socket.getaddrinfo(socket.gethostname(),0,socket.AF_INET): |
# exception if socket library isn't recent |
self.local_ip = info[4][0] |
if local_ips.includes(self.local_ip): |
self.last_got_ip = clock() |
if DEBUG: |
print 'Local IP found: '+self.local_ip |
break |
else: |
raise ValueError('couldn\'t find intranet IP') |
except: |
self.local_ip = None |
if DEBUG: |
print 'Error finding local IP' |
print_exc() |
return self.local_ip |
|
def test(self, upnp_type): |
if DEBUG: |
print 'testing UPnP type '+str(upnp_type) |
if not upnp_type or not _supported or self.get_ip() is None: |
if DEBUG: |
print 'not supported' |
return 0 |
pythoncom.CoInitialize() # leave initialized |
self.upnp = self.upnplist[upnp_type] # cache this |
if self.upnp.test(): |
if DEBUG: |
print 'ok' |
return upnp_type |
if DEBUG: |
print 'tested bad' |
return 0 |
|
def open(self, p): |
assert self.upnp, "must run UPnP_test() with the desired UPnP access type first" |
return self.upnp.open(self.get_ip(), p) |
|
def close(self, p): |
assert self.upnp, "must run UPnP_test() with the desired UPnP access type first" |
return self.upnp.close(p) |
|
def clean(self): |
return self.upnp1.clean() |
|
_upnp_ = _UPnP() |
|
UPnP_test = _upnp_.test |
UPnP_open_port = _upnp_.open |
UPnP_close_port = _upnp_.close |
UPnP_reset = _upnp_.clean |
|