1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
#!/usr/bin/env python2
# Program to detect Ceton InfiniTV tuners on the network
# Program will only listen on a specific interface for a set amount of time.
# If during that time it finds a device it will print out the location.
# discover_infinitv $ip $timeout
# ex: discover_infinitv 192.168.200.2
import socket
import thread
import time,sys
class find_the_infiniTV(object):
def __init__(self,UPNP_GROUP="239.255.255.250", UPNP_PORT=1900, interface_ip='127.0.0.1'):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', UPNP_PORT))
mreq = socket.inet_aton(UPNP_GROUP)+socket.inet_aton(interface_ip)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
self.sock=sock
self.devices = {}
self.emit_msearch()
def emit_msearch(self):
msearch_string = 'M-SEARCH * HTTP/1.1\r\nHOST: 239.255.255.250:1900\r\nMAN: "ssdp:discover"\r\nMX: 2\r\nST: urn:schemas-upnp-org:device:MediaRenderer:1\r\n\r\n'
msearch_encoded_string = msearch_string.encode()
self.sock.sendto( msearch_encoded_string, ('239.255.255.250', 1900 ) )
# Internally used method to read a notification
def _listen_notification(self, data):
upnp_header, upnp_body = data.split("\r\n\r\n")
# Get header by line -- as fragments
upnp_hfrags = upnp_header.split("\r\n")
upnp_hfrags.pop(0)
header = {}
for frag in upnp_hfrags:
splitpoint = frag.find(": ")
header[frag[:splitpoint]] = frag[splitpoint+2:]
if("USN" in header):
if("CetonFactory" in header["USN"]):
if(header["USN"] not in self.devices):
self.devices[header["USN"]] = header["USN"]
print("Found InfiniTV. Location URL: {0}".format(header["LOCATION"]))
def listen(self):
import select
self.listening = True
while self.listening:
upnp_data = self.sock.recv(10240).decode('utf-8')
#Filter by type (we only care about NOTIFY for now)
if(upnp_data[0:6] == "NOTIFY"):
self._listen_notification(upnp_data)
def stop_listen(self):
self.listening = False
if len(self.devices) == 0:
print "No InfiniTV found"
if __name__ == "__main__":
# Create a default UPnP socket
try:
interface_ip=sys.argv[1]
except:
print "detect_infinitv $ip_address"
sys.exit(1)
try:
timeout = float(sys.argv[2])
except:
timeout=5
L = find_the_infiniTV(interface_ip=interface_ip)
thread.start_new_thread(L.listen, ())
time.sleep(timeout)
L.stop_listen()
|