19 November 2018

Decoding Morse Code From Video

The following post is available as a Jupyter Notebook from the GitHub repo. You can download all the videos from there, or you should be able to watch the full color version here:

Art With a Message

In August of 2018, while walking around UCSD, I noticed a large, blinking light atop Urey Hall. Observing the flashes, it became clear quite quickly that it was blinking Morse Code. The question was, what did it say? Not being fluent in Morse, I captured the flashes the best I could with my phone in hopes of decoding the message offline.

The thought of watching a video over and over trying to decode dots and dashes from a signal lamp did not leave me terribly excited, so I decided to try doing it programmatically. Some thresholding, a little Forier Analysis, filtering, and a Morse Code tree should tease out the message from the video. Overall, a great project for learning some OpenCV. Sadly, my shaky cell phone video was not conducive to being easily analysed, so I vowed to return with a tripod and a better camera for some solid video.

Unfortunately, the light disappeared after that. Until November, when it was mounted on a light pole between Urey Hall and Mayer Hall, signalling proudly above campus. I got as high and as close as I could and still have a view of the light -- the top level of the molecular bridge between Bonner and Mayer Halls -- and set up my tripod and camera with my largest lens to record around 10 minutes of video. Since I was shooting with a "real" camera instead of my phone, I stopped down the aperture and quickened the shutter speed to get a nice, stable, contrasty video (exceptin' the bounce when I pressed the record button).

When I got to my computer, I shrunk the video down to a more managable, yet viewable size, and stripped the audio. It looked like this, only the lights blinked:

I actually made a grayscale version of the video, because it's easier to play with (only one set of magnitudes to deal with instead of three). Video conversions were done with ffmpeg, because ffmpeg is awesome.

With some usable video captured, the first thing to do was to make an OpenCV environment in Anaconda so I could play around in a Jupyter Notebook. On Linux, that's simple enough, since OpenCV is included.

$ conda create --name opencv
$ source activate opencv
$ conda install -n opencv opencv matplotlib
$ jupyter notebook

Once we created a notebook to play in (this very one!), we could import our libraries, and switch to present tense in our narrative.

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
In [2]:
cv2.__version__
Out[2]:
'3.4.2'

Getting Our Feet Wet

First thing first, we need to be able to read our video. According to the OpenCV Python Video tutorial, that's just a matter of calling cv2.VideoCapture() with the path to the file as an argument. As long as cap.isOpened() is True, we are playing with video data.

In [3]:
video = '20181110-854x480-gray.mov'
cap = cv2.VideoCapture(video)
cap.isOpened()
Out[3]:
True
In [4]:
ret, frame = cap.read()
ret, type(frame)
Out[4]:
(True, numpy.ndarray)

So the frame is an ndarray. At this point, we're essentially playing in numpy. So much for learning much OpenCV.

In [5]:
cap.release()

Analysing the Data

Our first attack is going to be a simple threshold on the max value of the video frame. To make things even easier on us, we can crop the video so it only contains the highest contrast part. Specifically, the contents of the red box in this image:

Using our favorite image editing program, we know that the coordinates of the upper left corner of that box is at 440, 190, and it's 15 pixels wide and 60 pixels tall. Exactly the information ffmpeg needs to crop the video for us.

$ ffmpeg -i 20181110-854x4800-gray.mov -vf crop=15:60:440:190 20181110-crop.mov

To help with analysis, being able to visualize the data always helps, so let's create some helper functions to do that to avoid boilerplate.

In [6]:
%matplotlib inline
def plot(signal, width, title, xlabel, ylabel):
    """Plot the first `width` samples of `signal`."""
    fig = plt.figure()
    ax = fig.add_subplot(111)
    ax.plot(np.arange(width), signal[:width])
    ax.set_title(title)
    ax.set_xlabel(xlabel)
    ax.set_ylabel(ylabel)
    
def histogram(signal, title, xlabel):
    """Plot histogram of `signal`"""
    fig = plt.figure()
    ax = fig.add_subplot(111)
    dist, bins = np.histogram(signal, bins=np.arange(signal.max()))
    ax.bar(bins[:-1], dist)
    ax.set_title(title)
    ax.set_xlabel(xlabel)
    ax.set_ylabel('Count')

Next, a function to return the brightest pixel value for each frame of video:

In [7]:
def frame_maxes(fname):
    """ Generate the max values in each frame of the video file `fname`"""
    cap = cv2.VideoCapture(fname)
    while True:
        ret, frame = cap.read()
        if ret:
            yield np.max(frame)
        else:
            break
            
    cap.release()
In [8]:
# Max values as an ndarray.        
video = '20181110-crop.mov'
fmax_crop = np.array([m for m in frame_maxes(video)], dtype=int)
plot(fmax_crop, 200, 'Max Pixel Intensity', 'Frame', 'Max Pixel Intensity')

This looks very promising. Looks like we can threshold the max values to determine our on/off points.

In [9]:
threshold = 125
signal_crop = fmax_crop > threshold
In [10]:
plot(signal_crop, 200, 'Signal value by frame', 'Frame', 'Signal (high/low)')

To turn this into Morse Code, we need to be able to distingush dots, dashes, letter breaks and word breaks. Each of those is determined by the length of a 'high' or 'low' run, and those runs are determined by the transition edges. So lets find the indexes of the transition edges, and see what our run lengths are.

In [11]:
signal_crop = signal_crop.astype(int)
edges_crop = signal_crop[1:] - signal_crop[:-1]
edge_crop_idx = np.where(edges_crop != 0)[0]
In [12]:
def signal_runs(edges, edge_idx):
    """Returns number of frames each 'high' and 'low' run are."""
    runs = edge_idx - np.concatenate((np.array([0], dtype=int), edge_idx[:-1]))
    if edges[edge_idx[0]] > 0:
        # First transition was low to high.
        highs = runs[1::2]
        lows = runs[::2]
    else:
        # First transition was high to low.
        highs = runs[::2]
        lows = runs[1::2]
        
    return highs, lows
In [13]:
highs, lows = signal_runs(edges_crop, edge_crop_idx)
In [14]:
histogram(highs, 'Highs', 'Duration (frames)')
histogram(lows, 'Lows', 'Duration (frames)')

'High' run durations are mostly 6 or 18 frames in length, which makes sense since a 'dash' is supposed to be three times longer than a 'dot' in Morse Code. 'Low' run durations are mostly 6, 18, and 43 frames in length, corresponding to 'high' signal separations, symbol separations, and word separations respectively.

We have all the information we need. Now we need a function to decode it all.

Decoding the Message

In [15]:
def morse_decode(edges, edge_idx, dot_dash_boundary, dash_space_boundary):
    """Decode Morse Code message in `edges` and `edge_idx`."""
    morse_lookup = {
        '.-': 'A',
        '-...': 'B',
        '-.-.': 'C',
        '-..': 'D',
        '.': 'E',
        '..-.': 'F',
        '--.': 'G',
        '....': 'H',
        '..': 'I',
        '.---': 'J',
        '-.-': 'K',
        '.-..': 'L',
        '--': 'M',
        '-.': 'N',
        '---': 'O',
        '.--.': 'P',
        '--.-': 'Q',
        '.-.': 'R',
        '...': 'S',
        '-': 'T',
        '..-': 'U',
        '...-': 'V',
        '.--': 'W',
        '-..-': 'X',
        '-.--': 'Y',
        '--..': 'Z',
        '.----': '1',
        '..---': '2',
        '...--': '3',
        '....-': '4',
        '.....': '5',
        '-....': '6',
        '--...': '7',
        '---..': '8',
        '----.': '9',
        '-----': '0'
    }
    
    message = []
    morse = []
    signal_high = edges[edge_idx[0]] < 0
    durations = edge_idx - np.concatenate((np.array([0], dtype=int), edge_idx[:-1]))

    for duration in durations.tolist():
        if signal_high:
            # `duration` is for a 'high' signal.
            morse.append('.' if duration < dot_dash_boundary else '-')
        else:
            # `duration` is for a 'low' signal.
            # We can ignore signal ('.'/'-') boundaries.
            if duration > dot_dash_boundary:
                # Symbol boundary. Decode.
                message.append(morse_lookup.get(''.join(morse), '?'))
                morse = []  # Reset signal aggregation.
                if duration > dash_space_boundary:
                    # Word boundary. Inject word break into message.
                    message.append(' ')
                    
        signal_high = not signal_high
            
    return ''.join(message)    
In [16]:
morse_decode(edges_crop, edge_crop_idx, 10, 30)
Out[16]:
'WUGHT WHAT HATH GOD WROUGLT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT EMHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WH'

"What hath god wrought", an appropriate first message for a Morse signalling lamp. No doubt UCSD will change the message occasionally.

Beyond Softballs

Thresholding works well for the cropped video. And the original video looks like it would have a good signal to noise ratio as well. Let's see how this code works on the uncropped, grayscale video (Note: this takes quite a bit longer to run than with the cropped video).

In [17]:
video = '20181110-854x480-gray.mov'
fmax_full = np.array([m for m in frame_maxes(video)], dtype=int)
plot(fmax_full, 200, 'Max Pixel Intensity', 'Frame', 'Max Pixel Intensity')

For this to work, the threshold value will need to be raised. This makes sense, since there are brighter static pixels in the image from the tower.

In [18]:
threshold = 220
signal_full = fmax_full > threshold
signal_full = signal_full.astype(int)
edges_full = signal_full[1:] - signal_full[:-1]
edge_full_idx = np.where(edges_full != 0)[0]
morse_decode(edges_full, edge_full_idx, 10, 30)
Out[18]:
'WUGHT WHAT HATH GOD WROUGLT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT EMHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WH'

While this is all well and good, manually determining the threshold every time I want to decode a video of the signalling tower goes against my laziness principle. What we need is an automatic threshold detector.

Looking at the fmax plots, we can see that the pixel intensities cluster around two values. This can be more easily seen with a histogram.

In [19]:
histogram(fmax_full, 'Max Intensities by Frame (full)', 'Intensities')
histogram(fmax_crop, 'Max Intensities by Frame (crop)', 'Intensities')

Intuitively, a good threshold value looks like it would be halfway between the mean values of the two clusters. This can be calculated iteratively by guessing an initial threshold value (say, halfway between the min and max values), grouping the the signal into two bins (below and above the threshold), calculating the means of those bins, recomputing the threshold, and repeating until the threshold doesn't move (much).

In [20]:
def compute_threshold(signal, tolerance):
    """Return theshold value to split `signal` into two bins."""
    # Initial threshold guess is the halfway point.
    threshold = signal.min() + ((signal.max() - signal.min()) / 2)
    last_threshold = 0
    while abs(threshold - last_threshold) > tolerance:
        low = signal[signal < threshold].mean()
        high = signal[signal >= threshold].mean()
        last_threshold = threshold
        threshold = low + ((high - low) / 2)
    
    return threshold
In [21]:
compute_threshold(fmax_crop, 1), compute_threshold(fmax_full, 1)
Out[21]:
(134.62130336304722, 216.18292914064398)

I eyeballed 125 and 220 as thresholds. Not too bad.

We can also use the threshold computation to figure out the dot-dash threshold, and (in a hacky way) the dash-space threshold.

Now, we can write a function that takes a video file as input and outputs the Morse Code recorded within. The function calculates all thresholds for itself, so you should be able to record Morse Code from another source (say, the Capitol Records Building in Los Angeles) and decode the message.

In [22]:
def decode_morse_video(path):
    """Decode Morse Code from video file `path`"""
    fmax = np.array([m for m in frame_maxes(path)], dtype=int)
    threshold = compute_threshold(fmax, 1)
    signal = (fmax > threshold).astype(int)
    edges = signal[1:] - signal[:-1]
    edge_idx = np.where(edges != 0)[0]
    highs, lows = signal_runs(edges, edge_idx)
    dot_dash_threshold = compute_threshold(highs, 1)
    # Filter out the dots to threshold the dash-space threshold.
    dash_space_threshold = compute_threshold(lows[lows > dot_dash_threshold], 1)
    return morse_decode(edges, edge_idx, dot_dash_threshold, dash_space_threshold)
In [23]:
decode_morse_video('20181110-crop.mov')
Out[23]:
'WUGHT WHAT HATH GOD WROUGLT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT EMHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WH'
In [24]:
decode_morse_video('20181110-854x480-gray.mov')
Out[24]:
'WUGHT WHAT HATH GOD WROUGLT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT EMHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WHAT HATH GOD WROUGHT WH'