summaryrefslogtreecommitdiffstats
path: root/abs/core/LinHES-config/discover_infinitv.py
blob: 421ba8437ead0c313eceaae29e112a6e299c762a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python2
#   Program to detect Ceton InfiniTV tuners on the network
#   Program will only listen on a specific interface for a set amount of time.
#   If during that time it finds a device it will print out the location.
#   discover_infinitv $ip $timeout
#   ex:  discover_infinitv 192.168.200.2

import socket
import thread
import time,sys


class find_the_infiniTV(object):

    def __init__(self,UPNP_GROUP="239.255.255.250", UPNP_PORT=1900, interface_ip='127.0.0.1'):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind(('', UPNP_PORT))
        mreq = socket.inet_aton(UPNP_GROUP)+socket.inet_aton(interface_ip)
        sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
        self.sock=sock
        self.devices = {}
        self.emit_msearch()

    def emit_msearch(self):
        msearch_string = 'M-SEARCH * HTTP/1.1\r\nHOST: 239.255.255.250:1900\r\nMAN: "ssdp:discover"\r\nMX: 2\r\nST: urn:schemas-upnp-org:device:MediaRenderer:1\r\n\r\n'
        msearch_encoded_string = msearch_string.encode()
        self.sock.sendto( msearch_encoded_string,  ('239.255.255.250', 1900 ) )

      # Internally used method to read a notification
    def _listen_notification(self, data):
        upnp_header, upnp_body = data.split("\r\n\r\n")

        # Get header by line -- as fragments
        upnp_hfrags = upnp_header.split("\r\n")
        upnp_hfrags.pop(0)

        header = {}

        for frag in upnp_hfrags:
            splitpoint = frag.find(": ")
            header[frag[:splitpoint]] = frag[splitpoint+2:]

        if("USN" in header):
            if("CetonFactory" in header["USN"]):
                if(header["USN"] not in self.devices):
                    self.devices[header["USN"]] = header["USN"]
                    print("Found InfiniTV. Location URL: {0}".format(header["LOCATION"]))


    def listen(self):
        import select
        self.listening = True
        while self.listening:
            upnp_data = self.sock.recv(10240).decode('utf-8')
            #Filter by type (we only care about NOTIFY for now)
            if(upnp_data[0:6] == "NOTIFY"):
               self._listen_notification(upnp_data)

    def stop_listen(self):
        self.listening = False
        if len(self.devices) == 0:
            print "No InfiniTV found"


if __name__ == "__main__":
    # Create a default UPnP socket
    try:
        interface_ip=sys.argv[1]
    except:
        print "detect_infinitv $ip_address"
        sys.exit(1)

    try:
        timeout = float(sys.argv[2])
    except:
        timeout=15
    L = find_the_infiniTV(interface_ip=interface_ip)
    thread.start_new_thread(L.listen, ())
    time.sleep(timeout)
    L.stop_listen()