Close

Python script

A project log for ISS HDEV image availability

The "High Definition Earth Viewing" experiment onboard the ISS runs a few cameras, but not always. Let's find out when video is available!

christophChristoph 10/11/2016 at 19:290 Comments

So here's the first working python script, ignoring all good practices like checking for return values or doing things in the right place. It also doesn't output zero and ones, but a float that is the L2 norm between the current frame and the reference frame. But in a way it does the job:

#!/usr/bin/env python

from __future__ import print_function

import sys

import gi

from gi.repository import GObject as gobject, Gst as gst
from streamlink import Streamlink, StreamError, PluginError, NoPluginError

import cv2
import numpy

def exit(msg):
    print(msg, file=sys.stderr)
    sys.exit()


class StreamlinkPlayer(object):
    def __init__(self):
        self.fd = None
        self.mainloop = gobject.MainLoop()

        # This creates a playbin pipeline and using the appsrc source
        # we can feed it our stream data
        self.pipeline = gst.Pipeline.new("player")
        source = gst.ElementFactory.make("uridecodebin", "decodebin")
        source.set_property("uri", "appsrc://")
        self.pipeline.add(source)

        self.vconverter = gst.ElementFactory.make("videoconvert", "vconverter")
        vsink = gst.ElementFactory.make("appsink", "videosink")
        vsink.set_property("emit-signals", True)
        vsink.set_property("max-buffers", 1)
        caps = gst.caps_from_string("video/x-raw, format=(string){RGB}")
        vsink.set_property("caps", caps)
        vsink.connect("new-sample", self.on_new_sample)
        filter = gst.ElementFactory.make("videorate", "fpsfilter")
        filter.set_property("max-rate", 1)
        self.pipeline.add(self.vconverter)
        self.pipeline.add(filter)
        self.pipeline.add(vsink)
        self.vconverter.link(filter)
        filter.link(vsink)
        # self.vsink = vsink

        source.connect("source-setup", self.on_source_setup)
        source.connect("pad-added", self.on_pad_added)
        
        # Creates a bus and set callbacks to receive errors
        self.bus = self.pipeline.get_bus()
        self.bus.add_signal_watch()
        self.bus.connect("message::eos", self.on_eos)
        self.bus.connect("message::error", self.on_error)

    def exit(self, msg):
        self.stop()
        exit(msg)

    def stop(self):
        # Stop playback and exit mainloop
        self.pipeline.set_state(gst.State.NULL)
        self.mainloop.quit()

        # Close the stream
        if self.fd:
            self.fd.close()

    def play(self, stream):
        # Attempt to open the stream
        try:
            self.fd = stream.open()
        except StreamError as err:
            self.exit("Failed to open stream: {0}".format(err))

        # Start playback
        self.pipeline.set_state(gst.State.PLAYING)
        self.mainloop.run()

    def on_source_setup(self, element, source):
        # When this callback is called the appsrc expects
        # us to feed it more data
        print("source setup")
        source.connect("need-data", self.on_source_need_data)
        print("done")
        
    def on_new_sample(self, appsink):
        sample = appsink.emit("pull-sample")
        buf = sample.get_buffer()
        caps = sample.get_caps()
        height = caps.get_structure(0).get_value('height')
        width = caps.get_structure(0).get_value('width')
        (result, mapinfo) = buf.map(gst.MapFlags.READ)
        if result == True:
            arr = numpy.ndarray(
                (height,
                 width,
                3),
                buffer=buf.extract_dup(0, buf.get_size()),
                dtype=numpy.uint8)
            resized_refimage = cv2.resize(refArray, (width, height))
            sum = int(0)
            diff = cv2.norm(arr, resized_refimage, cv2.NORM_L2)
            print("diff = " + str(diff))
            
        buf.unmap(mapinfo)
        return gst.FlowReturn.OK

    def on_pad_added(self, element, pad):
        string = pad.query_caps(None).to_string()
        print(string)
        if string.startswith('video/'):
          pad.link(self.vconverter.get_static_pad("sink"))

    def on_source_need_data(self, source, length):
        # Attempt to read data from the stream
        try:
            data = self.fd.read(length)
        except IOError as err:
            self.exit("Failed to read data from stream: {0}".format(err))

        # If data is empty it's the end of stream
        if not data:
            source.emit("end-of-stream")
            return

        # Convert the Python bytes into a GStreamer Buffer
        # and then push it to the appsrc
        buf = gst.Buffer.new_wrapped(data)
        source.emit("push-buffer", buf)

    def on_eos(self, bus, msg):
        # Stop playback on end of stream
        self.stop()

    def on_error(self, bus, msg):
        # Print error message and exit on error
        error = msg.parse_error()[1]
        self.exit(error)


def main():
    if len(sys.argv) < 4:
        exit("Usage: {0} <url> <quality> <reference png image path>".format(sys.argv[0]))

    # Initialize and check GStreamer version
    gi.require_version("Gst", "1.0")
    gobject.threads_init()
    gst.init(None)

    # Collect arguments
    url = sys.argv[1]
    quality = sys.argv[2]
    refImage = sys.argv[3]
    global refArray
    image = cv2.imread(refImage)
    refArray = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)#cv2.COLOR_BGR2GRAY)
    refArray = cv2.blur(refArray, (3,3))

    # Create the Streamlink session
    streamlink = Streamlink()

    # Enable logging
    streamlink.set_loglevel("info")
    streamlink.set_logoutput(sys.stdout)

    # Attempt to fetch streams
    try:
        streams = streamlink.streams(url)
    except NoPluginError:
        exit("Streamlink is unable to handle the URL '{0}'".format(url))
    except PluginError as err:
        exit("Plugin error: {0}".format(err))

    if not streams:
        exit("No streams found on URL '{0}'".format(url))

    # Look for specified stream
    if quality not in streams:
        exit("Unable to find '{0}' stream on URL '{1}'".format(quality, url))

    # We found the stream
    stream = streams[quality]

    # Create the player and start playback
    player = StreamlinkPlayer()

    # Blocks until playback is done
    player.play(stream)

if __name__ == "__main__":
    main()
Next:

Discussions