Close

Developing a simple UPnP/SSDP scan app

A project log for ZeroPhone - a Raspberry Pi smartphone

Pi Zero-based open-source mobile phone (that you can assemble for 50$ in parts)

ArsenijsArsenijs 07/21/2017 at 22:100 Comments

I'm sorry for the lack of code highlighting, especially given that it's crucial in this worklog. It seems to be some kind of Hackaday.io glitch - I hope it'll get fixed soon.

Sometimes, I find interesting articles that explain various network security concepts in a simple way. This week, I've found an article (in Russian, but the part that the snippet is based on is available in English) about UPnP and botnets, with some Python code snippets - and I've decided to build an app around those snippets. As I've built it now, I'm going to describe my app writing workflow - while I still remember it =)

First, SSH into my ZeroPhone and try this snippet (from the article in Russian):

#!/usr/bin/env python2
import socket  
import sys
dst = "239.255.255.250"  
if len(sys.argv) > 1:  
    dst = sys.argv[1]
st = "upnp:rootdevice"  
if len(sys.argv) > 2:  
    st = sys.argv[2]
msg = [  
    'M-SEARCH * HTTP/1.1',
    'Host:239.255.255.250:1900',
    'ST:%s' % (st,),
    'Man:"ssdp:discover"',
    'MX:1',
    '']
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)  
s.settimeout(10)  
s.sendto('\r\n'.join(msg), (dst, 1900) )
while True:  
    try:
        data, addr = s.recvfrom(32*1024)
    except socket.timeout:
        break
    print "[+] %s\n%s" % (addr, data)

 It works and prints stuff! Furthermore, it can be tweaked - the most tweakable value seems to be the timeout value, as that's effectively how long the "scanning" part will be running. Let's convert it into a simple app!

---------- more ----------

I like to take existing not-yet-bloated apps as a base for new apps - it's a simple "cp apps/old/ apps/new/" The most suitable app for this seems to be the I2C device scan app  - also a snippet that I wrapped into an app =) For day-to-day development, I use tmux - it's very convenient to have dual panes, one with script being edited, one with shell where I run the script.

I'm still running an active SSH session into a ZeroPhone - I'll make it fullscreen, it's easier that way. First, I cd into main pyLCI directory - for me, it's in /root/pyLCI, later on it's probably going to be in /home/pi/pyLCI. Then, I launch tmux, split into two panes vertically - enough to be a good workspace! 

My layout while debugging a small checkbox label-related bug - two nano windows and running code, all in one window


Networking-related apps (even apps that can help with network pentesting) are, for now, stored in apps/network_apps/ directory. So, I create a new directory there - apps/network_apps/nmap. Then, I copy main.py from I2C scan app into that directory, and create an empty __init__.py. Now, if we were to launch it right now, it'd be absolutely the same as I2C scan app - because the code is the same, we just copied it over =) Now, time to edit the code - but first, what are the requirements?

  1. "Scan" button, which triggers scan, then displays the results
  2. Timeout needs to be adjustable through the UI. pyLCI has a suitable UI element for that - IntegerAdjustInput, which lets you increment/decrement an integer using "up" and "down" keys.
  3. Timeout, if set to a custom value, needs to persist between pyLCI launches - so we'll use a config file.

First thing I usually do is defining the app menu contents. It helps me understand what are the user-exposed functions, how are they linked together, and thus helps me understand the structure of the app:

main_menu_contents = [
["Scan", run_scan],
["Change timeout", adjust_timeout]
]

App starts with a menu, so the "callback"  function will look like this:

def callback():
    Menu(main_menu_contents, i, o, "UPnP/SSDP app menu").activate()

What remains is to write "run_scan" and "adjust_timeout" - simple, right? =) Let's start with "adjust_timeout" - it should take value from the config that's currently loaded, let user adjust it and save the updated config back to the config file:

def adjust_timeout():
    global config
    timeout = IntegerAdjustInput(config["timeout"], i, o, message="Socket timeout:").activate()
    if timeout is not None and timeout > 0:
        config["timeout"] = timeout
        write_config(config, config_path)

Our app is not class-based, so we're using a global variable for config storage. I like to use dictionaries for configs, and pack them into JSON - coincidentally, pyLCI offers two functions, read_config(path) and write_config(config, path), that work on JSON-encoded data structures in files.

Now, we need to write something that actually loads the config file. I believe it should be safe to do this as the app is imported, right after the import statements. We need to figure out the correct location relative to the app folder, so that it doesn't load the main "config.json" (with input and output device definitions), but an app-specific one. It's basically boilerplate code, soI published some code snippets on the ZeroPhone Wiki - just copy-paste them into the app =) Also, it has a snippet that detects if config file is missing/malformed - I'll use that one, too, I can just copy-paste it and get some failsafe functionality =)

current_module_path = os.path.dirname(sys.modules[__name__].__file__)
config_filename = "config.json"
default_config = '{"timeout":1,"dst":"239.255.255.250","st":"upnp:rootdevice"}'
config_path = os.path.join(current_module_path, config_filename)
try:
    config = read_config(config_path)
except (ValueError, IOError):
    print("{}: broken/nonexistent config, restoring with defaults...".format(menu_name))
    with open(config_path, "w") as f:
        f.write(default_config)
    config = read_config(config_path)

The "run_scan" function is the only one left. The first part is the same as in the Python script I'm re-making, but I add a Printer() - notice that it runs for 0 seconds, basically, printing text on display and exiting - this is for user-friendliness, so that user knows what the app is doing right now =) Also, some parameters are taken from config, of course - not all of them are editable from the UI, but at least they're changeable using the config file.

def run_scan():
    Printer("Scanning:", i, o, 0)
    msg = [
        'M-SEARCH * HTTP/1.1',
        'Host:{}:1900'.format(config["dst"]),
        'ST:{}'.format(config["st"]),
        'Man:"ssdp:discover"',
        'MX:1',
        '']
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
    s.settimeout(config["timeout"])
    s.sendto('\r\n'.join(msg), (config["dst"], 1900) )

Then, let's actually scan the devices. To store scan results, I add an OrderedDict - it feels more user-friendly to output requests in the same order they were received. Also, I print tracebacks any exceptions that were received that aren't socket timeout exceptions -  logging would be a better idea, but for now there isn't a well-defined pyLCI-wide logging facility - so logging will be added once the support is there. Address is also received as a (string, int) tuple (IP, port) - but it only really makes sense to store it as a string, so it's converted to one.

    found_devices = OrderedDict()
    while True:
        try:
            data, addr = s.recvfrom(32*1024)
        except socket.timeout:
            break
        except:
            print(format_exc())
        else:
            ip_str = "{}:{}".format(*addr)
            found_devices[ip_str] = data

The last part is outputting values to screen. Responses are received as multiple-line HTTP-like requests, and only relatively small part of a request is useful. So, for now it makes sense to just show IPs to the user, and show full request contents if user wants to see them. There's certainly some more useful data in responses that could be displayed, but it'd require parsing the responses - this app was literally written in half an hour's time, so I didn't really think of that as a feature - guess that's a TODO =) 

So, I get a list of IPs and I have to show them to user, so that user could click on one, see data, go back, click on another one, see data for that IP - that's a job for a Menu. There's a very similar Listbox UI element that could be used, but it returns immediately when the value is selected, so it's not suitable for this task (you'd implement, say, audio channel selector with a Listbox, but audio settings menu with a Menu.). Right now, each IP address will take one row of characters on screen - if I decide to add more data to the IP view (like, get device make and model from response), I can use entry_height attribute of Menu to be able to show two and more rows of data for each Menu entry.

For menu, you need a list of two-element lists, where first element will be entry representation on the screen, and second element will be a function that'll be called when entry is selected (callback). I like to write small generators for that: 

    if not found_devices:
        Printer("No devices found", i, o, 2)
    else:
        data = [[ip, lambda x=ip, y=d: read_info(x, y)] for ip, d in found_devices.iteritems()]
        Menu(data, i, o).activate()

I have one function, and it needs to be called with different arguments, depending on the entry selected - that's a job for lambdas =) Notice the x= , y= trick - lambdas do not create namespaces and don't store values which are passed as arguments, so I have to "store" them this way. Then, I construct a Menu and pass control to it on the spot - that's where user gets the list of IPs on the screen. What's in the read_info function?

def read_info(ip_str, data):
    print("[+] {}\n{}".format(ip_str, data))
    Printer(ffs("[+] {}\n{}".format(ip_str, data), o.cols), i, o, 5

It simply shows the request to the user (in the same format that the original snippet did), printing it to console (as a side effect, will remove soon). Notice the ffs("long string with linebreaks", o.cols) - it takes a string and makes a list suitable for on-screen output, splitting it by words where possible and marking linebreaks with empty on-screen lines. I actually think there should be a PrettyPrinter, basically a Printer(ffs(data, ...)...) - maybe I'll do it in spare time (can somebody do it earlier?). 

There's this bad thing - there isn't yet an UI element for reading a block of text without interruptions. Printer UI element will eventually exit on its own - after all the screens pass by. I set timeout for each screen to be 5 seconds, but it can easily not be enough for a slow reader, and it isn't as user-friendly anyway. So, such an UI element is in the TODO.


This is pretty much it! We have a working app - it scans for UPnP/SSDP devices and shows any responses it gets. Of course, it's not complete- as of now, it needs some more things to be more useful:

  1. Saving full and individual scan results into files in app folder when some key is pressed - like NMap app already does. I don't think this feature will be useful enough to be a snippet in the wiki, but I can pretty much copy-paste the related code from the NMap app and have this functionality.
  2. Handle more errors, for example, when network is not available, when socket can't be opened, maybe even handle malformed responses - there will be a lot of ground to cover once this is tried on different devices.
  3. Once UI element for reading blocks of text appears, use it instead of Printer
  4. Add error logging - to some file
  5. Parse responses and show useful parts in the IP overview

Here are links to the app code - this is the commit I made today, and this is current version of the app at any point of time. Something else? 


Give your feedback in the comments, try this app on your network (it's not ZeroPhone-specific, so will work on your Linux PC with a pyLCI emulator), send bug reports and pull requests - all that stuff =) I hope that this writeup will be useful as a tutorial, giving you insights into how to write apps, as well as into decisions involved in making a user-friendly and useful ZeroPhone app.

Discussions