Advent 2023 banner

Writing a Python App to Turn Our Christmas Tree On/Off Using AI

Photo of Raul Muñoz

Posted on Dec 11, 2023 by Raul Muñoz

13 min read

Welcome to part eleven of our AI Powered Christmas Tree Series! The goal of this series is to serve as a guide for you to build your own AI powered Machine Learning Christmas Tree, thanks to the Arduino Portenta X8 SoM and Edge Impulse AI Platform.

In this tutorial, you will be creating a Python app to manage the MQTT signals, which controls turning your AI powered Christmas Tree lights on/off.

Before jumping into this entry, be sure to check out the previous blog entries. You can find a summary of each tutorial, and a sneak peek of what’s coming next on our project overview page.

Using Python SDK for Linux According to Edge Impulse AI Documentation

The Edge Impulse for Linux documentation provides Node.js, Python, Go, and C++ SDKs capable of running your module with just a few lines of code. This serves as a basic template, which you can add to for implementing your own ideas.

Currently, we have an AI model which can detect people—and dogs in my case. The next step is to implement an action to occur based on this detection. This action will be turning the lights on/off. To do this, you will be working with the Python SDK. First you will download, build, and run the model using the Python SDK example. Then you’ll enhance the example code, implementing MQTT messages to control your Christmas Tree's lights!

Prerequisites

This is part ten of our AI Powered Christmas Tree Series. Please ensure you’ve completed the earlier parts before continuing. You will also need:

  • Edge Impulse account

Edge Impulse Command Line Interface (CLI)

In our previous tutorials, you have mainly used the Edge Impulse Command Line Interface (CLI) Docker container for testing your model. The Dockerfile for this CLI also includes linux-sdk-python. The relevant line in the Dockerfile for linux-sdk-python is:

RUN git clone https://github.com/edgeimpulse/linux-sdk-python

If you don’t have the Docker image running on your device, just follow the instructions on the previous tutorial to run it again.

Once you have it up and running, navigate to the following directory inside the Edge Impulse CLI Container:

device-docker:~$ cd /linux-sdk-python/examples/image

Download and Build the Model for Linux Devices

Before running the Python SDK example, you have to download the model and build it for your specific device. The command below downloads and builds the module:

device-docker:~$ edge-impulse-linux-runner --download modelfile.eim

The Edge Impulse for Linux models are delivered in .eim format. This is an executable that contains your signal processing and ML code, compiled with optimizations for your processor or GPU.

Running Python SDK

Run classify.py and point it to the model you just download:

device-docker:~$ edge-impulse-linux-runner --download modelfile.eim

Found 1 bounding boxes (29 ms.)
	people (0.77): x=48 y=144 w=8 h=24
Found 1 bounding boxes (28 ms.)
	people (0.83): x=88 y=160 w=8 h=16
Found 3 bounding boxes (28 ms.)
	people (0.93): x=56 y=144 w=8 h=8
	people (0.93): x=88 y=160 w=8 h=16

Analyzing the Python Code Blocks

The classify.py file can be found on GitHub: classify.py

Let’s take a look at some of the more interesting code blocks:

import device_patches       # Device specific patches for Jetson Nano (needs to be before importing cv2)

import cv2
import os
import sys, getopt
import signal
import time
from edge_impulse_linux.image import ImageImpulseRunner

import device_patches is specific for the NVIDIA® Jetson Nano™, which can be removed. The other imports are just standard imports for your project.

# if you don't want to see a camera preview, set this to False
show_camera = True
if (sys.platform == 'linux' and not os.environ.get('DISPLAY')):
    show_camera = False

...
...
...
                if (show_camera):
                    cv2.imshow('edgeimpulse', cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
                    if cv2.waitKey(1) == ord('q'):

The classify.py example can display the camera image live on the screen. Because we are running inside a container without access to a display, this is unnecessary, so it will also be removed.

def get_webcams():
    port_ids = []
    for port in range(5):
        print("Looking for a camera in port %s:" %port)
        camera = cv2.VideoCapture(port)
        if camera.isOpened():
            ret = camera.read()[0]
            if ret:
                backendName =camera.getBackendName()
                w = camera.get(3)
                h = camera.get(4)
                print("Camera %s (%s x %s) found in port %s " %(backendName,h,w, port))
                port_ids.append(port)
            camera.release()
    return port_ids

This function searches for cameras on a set of ports, retrieves information about the found cameras, and returns a list of port numbers where cameras were detected. In our case, it will find the camera connected to /dev/videoX, and return it as a port number.

Main Classifier Loop

The main function begins by opening the camera, initializing the Edge Impulse module, and starting a for loop which gets the pictures from the camera.

For every new picture, it runs the classifier:

            for res, img in runner.classifier(videoCaptureDeviceId):

This classifier loop continuously captures frames from the camera, sending them to the Edge Impulse model for classification. It then processes and displays the results:

Found 3 bounding boxes (28 ms.)
	people (0.93): x=56 y=144 w=8 h=8
	people (0.93): x=88 y=160 w=8 h=16

In this example, the model has detected 3 objects. 2 of which it has identified as people. The confidence score (0.93) for both indicates a high level of confidence for the classification. The coordinates relay the bounding box’s top-left corner position.

Changing the Example for the Christmas Tree

Now that you are familiar with the generic example code, it is time to modify it to actually control the lights on our Christmas Tree! The following code was created to turn on and off the Christmas Tree lights, depending on how much time has passed since it last detected a person.

Most of the code is the same as the classify.py example, apart from the integration of MQTT commands sent to turn on/off the Christmas Tree lights, and some modifications to the core loop that counts the people label.

christmas-tree.​py

#!/usr/bin/env python

import cv2
import os
import sys, getopt
import signal
import time
import random
from edge_impulse_linux.image import ImageImpulseRunner
from paho.mqtt import client as mqtt_client


broker = '192.168.15.97'
port = 1883
topic = "cmnd/switch/POWER1"
# Generate a Client ID with the subscribe prefix.
client_id = f'subscribe-{random.randint(0, 100)}'
# username = 'emqx'
# password = 'public'

runner = None

def publish(client, msg):
    result = client.publish(topic, msg)
    status = result[0]
    if status == 0:
        print(f"Send `{msg}` to topic `{topic}`")
    else:
        print(f"Failed to send message to topic {topic}")

def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    # client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port, 2)
    return client

def now():
    return round(time.time() * 1000)

def get_webcams():
    port_ids = []
    for port in range(5):
        print("Looking for a camera in port %s:" %port)
        camera = cv2.VideoCapture(port)
        if camera.isOpened():
            ret = camera.read()[0]
            if ret:
                backendName =camera.getBackendName()
                w = camera.get(3)
                h = camera.get(4)
                print("Camera %s (%s x %s) found in port %s " %(backendName,h,w, port))
                port_ids.append(port)
            camera.release()
    return port_ids

def sigint_handler(sig, frame):
    print('Interrupted, stopping the program')
    if runner:
        runner.stop()
    sys.exit(0)

signal.signal(signal.SIGINT, sigint_handler)

def help():
    print('python classify.py <path_to_model.eim> <Camera port ID, only required when more than 1 camera is present>')

def main(argv):
    try:
        opts, args = getopt.getopt(argv, "h", ["--help"])
    except getopt.GetoptError:
        help()
        sys.exit(2)

    for opt, arg in opts:
        if opt in ('-h', '--help'):
            help()
            sys.exit()

    if len(args) == 0:
        help()
        sys.exit(2)

    model = args[0]

    dir_path = os.path.dirname(os.path.realpath(__file__))
    modelfile = os.path.join(dir_path, model)

    print('MODEL: ' + modelfile)

    with ImageImpulseRunner(modelfile) as runner:
        try:
            model_info = runner.init()
            print('Loaded runner for "' + model_info['project']['owner'] + ' / ' + model_info['project']['name'] + '"')
            labels = model_info['model_parameters']['labels']
            if len(args)>= 2:
                videoCaptureDeviceId = int(args[1])
            else:
                port_ids = get_webcams()
                if len(port_ids) == 0:
                    raise Exception('Cannot find any webcams')
                if len(args)<= 1 and len(port_ids)> 1:
                    raise Exception("Multiple cameras found. Add the camera port ID as a second argument to use to this script")
                videoCaptureDeviceId = int(port_ids[0])

            client = connect_mqtt()
            camera = cv2.VideoCapture(videoCaptureDeviceId)
            camera.set(cv2.CAP_PROP_FPS, 1)
            fps = int(camera.get(5))
            print("Initializing camera at port %s, FPS %d" % (videoCaptureDeviceId, fps))
            ret = camera.read()[0]
            if ret:
                backendName = camera.getBackendName()
                w = camera.get(3)
                h = camera.get(4)
                print("Camera %s (%s x %s) in port %s selected." %(backendName,h,w, videoCaptureDeviceId))
                camera.release()
            else:
                raise Exception("Couldn't initialize selected camera.")

            next_frame = 0
            christmas_tree_on = False
            turn_off_christmas_tree(client)
            person_detection_count = 0
            last_detection_time = 0

            next_frame = 0 # limit to ~10 fps here

            for res, img in runner.classifier(videoCaptureDeviceId):
                if (next_frame > now()):
                    time.sleep((next_frame - now()) / 1000)

                if "bounding_boxes" in res["result"].keys():
                    person_detected = any(bb['label'] == 'people' and bb['value'] > 0.8 for bb in res["result"]["bounding_boxes"])

                    if person_detected:
                        last_detection_time = now()
                        person_detection_count += 1

                    print('Current person detection count:', person_detection_count)
                    if person_detection_count >= 2 and not christmas_tree_on:
                        print('Two or more people detected')
                        christmas_tree_on = True
                        turn_on_christmas_tree(client)
                        person_detection_count = 0

                    if christmas_tree_on and now() - last_detection_time >= 3000:
                        print('Turning off Christmas tree after 3 seconds of last detection')
                        christmas_tree_on = False
                        turn_off_christmas_tree(client)
                        person_detection_count = 0

                next_frame = now() + 50
        finally:
            if (runner):
                print('Stopping runner')
                runner.stop()

def turn_on_christmas_tree(client):
    client = connect_mqtt()
    publish(client, 'ON')
    print('Christmas tree turned on')

def turn_off_christmas_tree(client):
    client = connect_mqtt()
    publish(client, 'OFF')
    print('Christmas tree turned off')

if __name__ == "__main__":
   main(sys.argv[1:])

MQTT Setup

This function is responsible for connecting to the MQTT Broker:

def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    # client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

broker, port, topic and client_id are defined earlier in the script. These values will be set depending on how you have configured your MQTT switch. Refer back to Using an MQTT Switch Module for more information on this.

broker = '192.168.15.97'
port = 1883
topic = "cmnd/switch/POWER1"
# Generate a Client ID with the subscribe prefix.
client_id = f'subscribe-{random.randint(0, 100)}'

Publishing the Message

This function publishes your message to the cmnd/switch/POWER1 topic:

def publish(client, msg):
    result = client.publish(topic, msg)
    status = result[0]
    if status == 0:
        print(f"Send `{msg}` to topic `{topic}`")
    else:
        print(f"Failed to send message to topic {topic}")

While these two functions are used to send the ON and OFF messages to control the Christmas Tree lights:

def turn_on_christmas_tree(client):
    client = connect_mqtt()
    publish(client, 'ON')
    print('Christmas tree turned on')

def turn_off_christmas_tree(client):
    client = connect_mqtt()
    publish(client, 'OFF')
    print('Christmas tree turned off')

In summary, calling turn_on_christmas_tree(client) sends an ON message to the cmnd/switch/POWER1 MQTT topic using the MQTT client which is defined in theconnect_mqtt() function. This will turn the Christmas Tree lights on. A message is then printed to the console stating that the Christmas Tree lights have been turned on.

Main Image Infinity Loop

The main function begins the same as in the example above. It opens the camera, initializes the Edge Impulse module, and starts a for loop which gets the pictures from the camera.

It contains a similar classify loop as before, but with some additions.

First, a running count of the number of people detected, and the time lapse since the last detection. Initially, both are zero:

person_detection_count = 0
last_detection_time = 0

After classification, for each bounding box detected with the people label, it will increase person_detection_count by 1, but only if the confidence score is more than 0.8. This will also update last_detection_time.

if "bounding_boxes" in res["result"].keys():
                    person_detected = any(bb['label'] == 'people' and bb['value'] > 0.8 for bb in res["result"]["bounding_boxes"])
    if person_detected:
                        last_detection_time = now()
                        person_detection_count += 1

Once person_detection_count reaches or exceeds 2, the Christmas Tree lights are turned on. It is set at this value so as to account for any false detections.

if person_detection_count >= 2 and not christmas_tree_on:
                        print('Two or more people detected')
                        christmas_tree_on = True
                        turn_on_christmas_tree(client)
                        person_detection_count = 0

person_detection_count is set back to 0 to ensure that it maintains a fresh state for the next detection event and does not continue to accumulate from previous detections.

Finally, if the Christmas Tree lights are currently on, and it has not identified any people for 3 seconds, the lights will be turned off. person_detection_count is also reset to 0 to ensure that each new instance of people detection is evaluated independently:

if christmas_tree_on and now() - last_detection_time >= 3000:
    print('Turning off Christmas tree after 3 seconds of last detection')
    christmas_tree_on = False
    turn_off_christmas_tree(client)
    person_detection_count = 0

Here is some example output of the script in action:

python3 christmas-tree.py modelfile.eim

Initializing camera at port 1, FPS 5
Camera V4L2 (480.0 x 640.0) in port 1 selected.
Send `OFF` to topic `cmnd/switch/POWER1`
Christmas tree turned off
Current person detection count: 0
Current person detection count: 0
Current person detection count: 1
Current person detection count: 2
Two or more people detected
Send `ON` to topic `cmnd/switch/POWER1`
Christmas tree turned on
Current person detection count: 1
Current person detection count: 2
Current person detection count: 3
Current person detection count: 4
Current person detection count: 5
Current person detection count: 6
Current person detection count: 7
Current person detection count: 8
Turning off Christmas tree after 3 seconds of last detection
Send `OFF` to topic `cmnd/switch/POWER1`
Christmas tree turned off
Current person detection count: 0
Current person detection count: 0
Current person detection count: 0
Current person detection count: 1
Current person detection count: 2
Two or more people detected
Send `ON` to topic `cmnd/switch/POWER1`
Christmas tree turned on
Current person detection count: 1
Current person detection count: 2
Current person detection count: 3

In this example, the application has successfully detected people, and turned the Christmas Tree lights on, it then turned them off after there were no detection for 3 seconds, before finally turning them back on upon detecting people again.

Recap and Conclusion

In this tutorial, you have changed the example Python SDK to create something unique to your project. By integrating MQTT messages to control the lighting, you now have your very own AI powered Christmas Tree!

This application should start running every time the device is powered on. Join us in the next tutorial to achieve this, as we’re going to create a Docker Compose application. This application will launch a container and start classifying people automatically when the device is powered on.

Continue Building Ai-Powered Christmas Tree

If you are interested in our entire 14-part series on how to build an AI powdered Christmas tree, browse through each section below. Each part brings you closer to mastering your build:

Related posts

Keep up to date with Foundries.io