commit c14396d84798cadb08adcbe9f6e1f05910c43876 Author: Ryan Reed Date: Sun Jul 3 13:54:04 2022 -0400 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..042b064 --- /dev/null +++ b/.gitignore @@ -0,0 +1,120 @@ +.ampy +# Phue Creds +bridge.dat +# Swap files +*.sw[a-p] + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..1a37534 --- /dev/null +++ b/README.md @@ -0,0 +1,122 @@ +# MicroPython Sensor Alarm + +# Table of Contents + + + +* [Overview](#overview) +* [Credits](#credits) +* [Parts](#parts) +* [Magnetic Switch - Setup](#magnetic-switch---setup) +* [Setup](#setup) + + + + +# Overview + +This project utilizes a magnetic (Reed) switch to flash a group of Philips Hue lights and/or send an email + +The script is designed to go into deepsleep so that it could be utilized on battery power. It wakes up from the magnetic switch movement, causing the alarm to trigger + +This is one of the my first micropython projects so I combined a lot of things, like Philips Hue and emails together unnecessary. Should have considered modules for each instead. + +# Credits +* [HueBridge](https://github.com/FRC4564/HueBridge) bridge (named [uhue.py](libary/uhue.py) here) - Small changes made to support RGB colors as well +* [µMail](https://github.com/shawwwn/uMail) + + +# Parts +| Part | Cost | Quantity | Total | +| ------------------------------------------------------------------------------------------------------------------------------------ | ---- | -------- | ----- | +| [Magnetic Switch](https://smile.amazon.com/gp/product/B0735BP1K4/) | 8.98 | 1 | 8.98 | +| [Project Box](https://smile.amazon.com/gp/product/B0002BBQUA/) | 6.51 | 1 | 6.51 | +| A ESP32 Controller (I used [Wemos LOLIN D32](https://www.aliexpress.com/item/WEMOS-LOLIN32-V1-0-0-wifi-bluetooth-board-based-ESP-32-4MB-FLASH/32808551116.html)) | 6.50 | 1 | 6.51 | +| Momentary switch | | 1 | | + + +# Magnetic Switch - Setup +![Wemos Lolin D32 Layout](static/wemos-lolin-d32-magnetic_switch.png) + +[Fritzing Project](static/wemos-lolin-d32-magnetic_switch.fzz) + +* Note 1: Above schematic has the magentic switch connected at pin 32. This should work although I actually use 16 +* Note 2: 10kΩ [Pull-up Resistor](https://duino4projects.com/pull-resistors-explained/) + +`GPIO16` has a pull-up resistor that can be enabled. No need for an external resistor. + +I noticed that the board would not boot if the magnetic switch was connect to `GPIO0`. It would only work if I connected the switch after the boot. Using `GPIO16` seems to get around this. + + +```python +from machine import Pin +p0 = Pin(16, Pin.IN, Pin.PULL_UP) +``` + +Example Output +```python +>>> from machine import Pin +>>> p0 = Pin(16, Pin.IN, Pin.PULL_UP) +>>> p0.value() # Open +1 +>>> p0.value() # Closed +0 +>>> p0.value() # Open +1 +>>> p0.value() # Closed +0 +``` + +# Setup + +1. Modify [config.py](config.py). Leave those features you don't want commented out (e.g. phue, smtp, button, etc): + +```python +config = {} +config["wifi_ssid"] = "MySSID" +config["wifi_pass"] = "Password" + +config["sensor_pin"] = 4 # Sets Pin.PULL_UP +config["device_name"] = "Front Door Alarm" # Device sending alert + +config["config_button"] = 23 # Pin containing stop/start button +config["config_led"] = 5 # Pin with LED + +# Delete/Comment Out If Not Needed +config["phue"]["group"] = 3 # the light group to flash +config["phue"]["color1"] = (244, 0, 0) +config["phue"]["color2"] = (0, 0, 204) +config["phue"]["flash_count"] = 3 + +config["smtp"]["subject"] = "Front door open" +config["smtp"]["from"] = "" +config["smtp"]["to"] = "" +config["smtp"]["username"] = "" +config["smtp"]["password"] = "" +config["smtp"]["server"] = "smtp.gmail.com" +config["smtp"]["ssl"] = True +config["smtp"]["port"] = 465 +``` + +2. Upload the scripts + +```bash +ampy --port /dev/ttyUSB0 put library/alarm.py +ampy --port /dev/ttyUSB0 put library/leds.py +ampy --port /dev/ttyUSB0 put library/uhue.py +ampy --port /dev/ttyUSB0 put library/umail.py +ampy --port /dev/ttyUSB0 put library/wifi.py +ampy --port /dev/ttyUSB0 put config.py +ampy --port /dev/ttyUSB0 put main.py +``` + +3. If first run, you should connect over serial and open the switch. The script should ask for you to press the connect button on the bridge. Additionally, the `bridge.dat` file can be backed up and saved for other sensors that might use them: + +```bash +ampy --port /dev/ttyUSB0 get bridge.dat +``` + +which looks like the following: +```bash +["", ""] +``` diff --git a/boot.py b/boot.py new file mode 100644 index 0000000..1fd2b28 --- /dev/null +++ b/boot.py @@ -0,0 +1,7 @@ +import gc +import wifi + +status = wifi.connect() +print(f"Network Settings: {status}") + +gc.collect() # Garbage collection diff --git a/config.py b/config.py new file mode 100644 index 0000000..3fb33a5 --- /dev/null +++ b/config.py @@ -0,0 +1,29 @@ +config = {} +config["wifi_ssid"] = "" +config["wifi_pass"] = "" + +config["sensor_pin"] = 4 # Sets Pin.PULL_UP +config["device_name"] = "Front Door Alarm" # Device sending alert + +# +# UNCOMMENT AND CONFIGURE FEATURES +# +config["config_button"] = 23 # Pin containing stop/start button +config["config_led"] = 5 # Pin with LED + +config["phue"] = {} +config["phue"]["group"] = 3 # the light group to flash +config["phue"]["color1"] = (244, 0, 0) +config["phue"]["color2"] = (0, 0, 204) +config["phue"]["flash_count"] = 3 + +config["smtp"] = {} +config["smtp"]["server"] = "" +config["smtp"]["ssl"] = True +config["smtp"]["port"] = 465 + +config["smtp"]["subject"] = "Front door open" +config["smtp"]["from"] = "" +config["smtp"]["to"] = "" +config["smtp"]["username"] = "" +config["smtp"]["password"] = "" diff --git a/library/alarm.py b/library/alarm.py new file mode 100644 index 0000000..c9eb93f --- /dev/null +++ b/library/alarm.py @@ -0,0 +1,67 @@ +""" +Various alarm related functions +""" +from time import sleep + +def phue_fetch_light_states(bridge, group_id): + """ + Fetch the current states of each light. + + Returns a list of dictionaries {"id": "", "state": {}} + """ + lights = [] + + group = bridge.getGroup(group_id) + for light_id in group["lights"]: + light = bridge.getLight(light_id) + lights.append({"id": light_id, "state": light["state"]}) + + return lights + +def phue_flash_lights(bridge, config): + """ + Transition between 2 colors + """ + transition_time = 2 + sleep_time = 0.5 + i = 1 + while i < config["flash_count"]: + bridge.setGroup( + config["group"], rgb=config["color1"], transitiontime=transition_time, on=True + ) + sleep(sleep_time) + bridge.setGroup(config["group"], rgb=config["color2"], transitiontime=transition_time) + sleep(sleep_time) + i = i + 1 + +def send_email(name, config): + """ + Send an email noting an issue + """ + import umail + smtp = umail.SMTP(config["server"], config["port"], ssl=config["ssl"]) + smtp.login(config["username"], config["password"]) + smtp.to(config["to"]) + smtp.write("From: {}\n".format(config["from"])) + smtp.write("To: {}\n".format(config["to"])) + smtp.write("Subject: {}\n".format(name)) + smtp.write("-- Generated by {} --\n".format(__name__)) + smtp.send() + smtp.quit() + smtp = None + +def trigger_alert(name, phue=None, smtp=None): + """ + Set off the various alerts + """ + if phue: + import uhue as hue + bridge = hue.Bridge() + lights = phue_fetch_light_states(bridge, phue["group"]) + phue_flash_lights(bridge, phue) + # Reset light colors + for light in lights: + bridge.setLight(light["id"], **light["state"]) + + if smtp: + send_email(name, smtp) diff --git a/library/leds.py b/library/leds.py new file mode 100644 index 0000000..f76ccea --- /dev/null +++ b/library/leds.py @@ -0,0 +1,15 @@ +from machine import Pin +from time import sleep + + +def blink(led_pin, count, delay=0.5): + led = Pin(led_pin, Pin.OUT) # Usually GPIO2 + led.value(1) # Start off + x = 0 + while x < count*2: + led.value(not led.value()) + sleep(delay) + x = x + 1 + +def test(led_pin=5): + led_blink(led_pin, 3, delay=0.2) diff --git a/library/uhue.py b/library/uhue.py new file mode 100644 index 0000000..d103d6a --- /dev/null +++ b/library/uhue.py @@ -0,0 +1,250 @@ +import socket +from time import sleep +import json +try: + import requests +except: + import urequests as requests + +# UPnP SSDP Search request header +HEADER = b"""M-SEARCH * HTTP/1.1\r +HOST: 239.255.255.250:1900\r +MAN: "ssdp:discover"\r +ST: ssdp:all\r +MX: 3\r +\r +""" + + +def rgb2xy(r, g, b): + ''' + Converts an RGB value to xyz coordinates, based on matrix transformations + of colorspace values here (Wide Gamut RGB): + http://www.brucelindbloom.com/index.html?Eqn_RGB_XYZ_Matrix.html + ''' + X = 0.7161046 * r + 0.1009296 * g + 0.1471858 * b + Y = 0.2581874 * r + 0.7249378 * g + 0.0168748 * b + Z = 0.0000000 * r + 0.0517813 * g + 0.7734287 * b + x = X / (X+Y+Z) + y = Y / (X+Y+Z) + return [x, y] + +class Bridge: + """Provides methods for connecting to and using Hue Bridge. Supports + Micropython, Python 2, and 3.""" + + def __init__(self,autosetup=True, debug=1): + self.debug = debug #0=no prints, 1=messages, 2=debug + self.IP = None + self.username = None + if autosetup: + self.setup() + + + def show(self,str,level=1): + """ Show debug output. """ + if self.debug >= level: + print(str) + + + def setup(self): + """ Loads bridge settings or attempts to establish them, if needed.""" + success = self.loadSettings() + if success: + # verify bridge settings work + try: + self.idLights() + success = True + except: + success = False + if not success: + if self.discover(): + self.show('Bridge located at {}'.format(self.IP)) + self.show('>>> Press link button on Hue bridge to register <<<') + if self.getUsername(): + success = self.saveSettings() + else: + self.show("Couldn't get username from bridge.") + else: + self.show("Couldn't find bridge on LAN.") + return success + + + def discover(self): + """ Locate Hue Bridge IP using UPnP SSDP search. Discovery will return + when bridge is found or 3 seconds after last device response. Returns IP + address or None.""" + #On ESP8266, disable AP WLAN to force use of STA interface + #import network + #ap = network.WLAN(network.AP_IF) + #ap.active(False) + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.sendto(HEADER, ('239.255.255.250',1900)) #UPnP Multicast + s.settimeout(3) + + IP = None + while IP == None: + data, addr = s.recvfrom(1024) + self.show(str(data),2) + lines = data.split(b'\r\n') + for l in lines: + tokens = l.split(b' ') + if tokens[0] == b'SERVER:': + product = tokens[3].split(b'/') + if product[0] == b'IpBridge': + IP = str(addr[0]) + break + + s.close() + self.IP = IP + return IP + + + def getUsername(self): + """ Get a developer API username from bridge. + Requires that the bridge link button be pressed sometime while polling. + Polls for 20 seconds (20 attempts at 1 second intervals). + Can timeout with error if bridge is non-responsive. + Returns username on success or None on failure.""" + url = 'http://{}/api'.format(self.IP) + data = '{"devicetype":"TapLight#mydevice"}' + username = None + count = 20 + while count > 0 and username == None: + resp = requests.post(url,data=data) + if resp.status_code == 200: + j = resp.json()[0] + self.show(j,2) + if j.get('success'): + username = str(j['success']['username']) + self.username = username + sleep(1) + count -= 1 + return username + + + def saveSettings(self): + """ Save bridge IP and username to bridge.dat file. + Returns True on success.""" + if self.IP and self.username: + f=open('bridge.dat','w') + f.write(json.dumps([self.IP,self.username])) + f.close() + return True + else: + return None + + + def loadSettings(self): + """ Load bridge IP and username from bridge.dat file and set base URL. + Returns True on success. """ + try: + f=open('bridge.dat') + except: + return None + l = json.load(f) + f.close() + self.IP = str(l[0]) + self.username = str(l[1]) + self.show('Loaded settings {} {}'.format(self.IP,self.username),2) + return True + + + def resetSettings(self): + """Delete current saved bridge settings and reinitiate.""" + from os import remove + remove('bridge.dat') + self.IP = None + self.username = None + self.setup() + + + def url(self,path): + """Return url for API calls.""" + return 'http://{}/api/{}/{}'.format(self.IP,self.username,path) + + + def get(self, path): + """Perform GET request and return json result.""" + url = self.url(path) + self.show(url,2) + resp = requests.get(url).json() + self.show(resp,2) + return resp + + + def put(self, path, data): + """Perform PUT request and return response.""" + url = self.url(path) + self.show(url,2) + data = json.dumps(data) + self.show(data,2) + resp = requests.put(url, data=data).json() + self.show(resp,2) + return resp + + + def allLights(self): + """Returns dictionary containing all lights, with detail.""" + """Large return set, not ideal for controllers with limited RAM.""" + return self.get('lights') + + + def idLights(self): + """Returns list of all light IDs.""" + ids = self.get('groups/0')['lights'] + for i in range(len(ids)): + ids[i] = int(ids[i]) + return ids + + + def getLight(self,id): + """Returns dictionary of light details for given ID.""" + return self.get('lights/{}'.format(str(id))) + + + def getLights(self): + """Iterates through each light to build and return a dictionary + of light IDs and names.""" + dict = {} + for i in self.idLights(): + dict[i] = str(self.getLight(i)['name']) + return dict + + + def setLight(self,id,**kwargs): + """Set one or more states of a light. + Ex: setLight(1,on=True,bri=254,hue=50000,sat=254)""" + if 'rgb' in kwargs: + r,g,b = kwargs.pop('rgb') + kwargs['xy'] = rgb2xy(r, g, b) + + self.put('lights/{}/state'.format(str(id)),kwargs) + + + def allGroups(self): + """Returns dictionary containing all groups, with detail.""" + return self.get('groups') + + + def getGroup(self,id): + """Returns dictionary of group details.""" + return self.get('groups/{}'.format(str(id))) + + + def getGroups(self): + """Returns dictionary of group IDs and names.""" + dict = {} + groups = self.allGroups() + for g in groups: + dict[int(g)] = str(groups[g]['name']) + return dict + + + def setGroup(self,id,**kwargs): + """Set one or more states of a group. + Ex: setGroup(1,bri_inc=100,transitiontime=40)""" + if 'rgb' in kwargs: + r,g,b = kwargs.pop('rgb') + kwargs['xy'] = rgb2xy(r, g, b) + self.put('groups/{}/action'.format(str(id)),kwargs) diff --git a/library/umail.py b/library/umail.py new file mode 100644 index 0000000..5b08c96 --- /dev/null +++ b/library/umail.py @@ -0,0 +1,108 @@ +# µMail (MicroMail) for MicroPython +# Copyright (c) 2018 Shawwwn +# License: MIT +import usocket + +DEFAULT_TIMEOUT = 10 # sec +LOCAL_DOMAIN = '127.0.0.1' +CMD_EHLO = 'EHLO' +CMD_STARTTLS = 'STARTTLS' +CMD_AUTH = 'AUTH' +CMD_MAIL = 'MAIL' +AUTH_PLAIN = 'PLAIN' +AUTH_LOGIN = 'LOGIN' + +class SMTP: + def cmd(self, cmd_str): + sock = self._sock; + sock.write('%s\r\n' % cmd_str) + resp = [] + next = True + while next: + code = sock.read(3) + next = sock.read(1) == b'-' + resp.append(sock.readline().strip().decode()) + return int(code), resp + + def __init__(self, host, port, ssl=False, username=None, password=None): + import ussl + self.username = username + addr = usocket.getaddrinfo(host, port)[0][-1] + sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM) + sock.settimeout(DEFAULT_TIMEOUT) + sock.connect(addr) + if ssl: + sock = ussl.wrap_socket(sock) + code = int(sock.read(3)) + sock.readline() + assert code==220, 'cant connect to server %d' % code + self._sock = sock + + code, resp = self.cmd(CMD_EHLO + ' ' + LOCAL_DOMAIN) + assert code==250, '%d' % code + if CMD_STARTTLS in resp: + code, resp = self.cmd(CMD_STARTTLS) + assert code==220, 'start tls failed, %d' % code + self._sock = ussl.wrap_socket(sock) + + if username and password: + self.login(username, password) + + def login(self, username, password): + self.username = username + code, resp = self.cmd(CMD_EHLO + ' ' + LOCAL_DOMAIN) + assert code==250, '%d' % code + + auths = None + for feature in resp: + if feature[:4].upper() == CMD_AUTH: + auths = feature[4:].upper().split() + assert auths!=None, "no auth method" + + from ubinascii import b2a_base64 as b64 + if AUTH_PLAIN in auths: + cren = b64("\0%s\0%s" % (username, password))[:-1].decode() + code, resp = self.cmd('%s %s %s' % (CMD_AUTH, AUTH_PLAIN, cren)) + elif AUTH_LOGIN in auths: + code, resp = self.cmd("%s %s %s" % (CMD_AUTH, AUTH_LOGIN, b64(username)[:-1].decode())) + assert code==334, 'wrong username %d' % code + code, resp = self.cmd(b64(password)[:-1]) + else: + raise Exception("auth(%s) not supported " % ', '.join(auths)) + + assert code==235 or code==503, 'auth error %d' % code + return code, resp + + def to(self, addrs): + code, resp = self.cmd(CMD_EHLO + ' ' + LOCAL_DOMAIN) + assert code==250, '%d' % code + code, resp = self.cmd('MAIL FROM: <%s>' % self.username) + assert code==250, 'sender refused %d' % code + + if isinstance(addrs, str): + addrs = [addrs] + count = 0 + for addr in addrs: + code, resp = self.cmd('RCPT TO: <%s>' % addr) + if code!=250 and code!=251: + print('%s refused' % addr) + count += 1 + assert count!=len(addrs), 'recipient refused, %d' % code + + code, resp = self.cmd('DATA') + assert code==354, 'data refused, %d' % code + return code, resp + + def write(self, content): + self._sock.write(content) + + def send(self, content=''): + if content: + self.write(content) + self._sock.write('\r\n.\r\n') # the five letter sequence marked for ending + line = self._sock.readline() + return (int(line[:3]), line[4:].strip().decode()) + + def quit(self): + self.cmd("QUIT") + self._sock.close() diff --git a/library/wifi.py b/library/wifi.py new file mode 100644 index 0000000..83b5384 --- /dev/null +++ b/library/wifi.py @@ -0,0 +1,47 @@ +import network + +try: + from config import config +except ImportError: + config = {} + +ap_if = network.WLAN(network.AP_IF) +wifi = network.WLAN(network.STA_IF) + +def connect(enable_ap=False): + ap_if.active(enable_ap) + if not config.get("wifi_ssid") or not config.get("wifi_pass"): + print("WARNING: wifi_ssid or wifi_pass not configured in config.py") + return status() + + if not wifi.isconnected(): + wifi.active(True) + wifi.connect(config["wifi_ssid"], config["wifi_pass"]) + + while not wifi.isconnected(): + pass + return status() + +def disconnect(): + wifi = network.WLAN(network.STA_IF) + wifi.active(False) + +def status(): + results = { + "ap": ap_if.ifconfig()[0] != "0.0.0.0", + "wifi": { + "ip": "0.0.0.0", + "netmask": "0.0.0.0", + "gateway": "0.0.0.0", + "dns": "0.0.0.0", + } + } + if wifi.isconnected(): + ifconfig = wifi.ifconfig() + results["wifi"] = { + "ip": ifconfig[0], + "netmask": ifconfig[1], + "gateway": ifconfig[2], + "dns": ifconfig[3], + } + return results diff --git a/main.py b/main.py new file mode 100644 index 0000000..a6dc7ea --- /dev/null +++ b/main.py @@ -0,0 +1,36 @@ +from machine import deepsleep, Pin, wake_reason +from esp32 import wake_on_ext0 +from time import sleep +from config import config +from leds import led_blink + +def main(): + pin_switch = Pin(config['sensor_pin'], Pin.IN, Pin.PULL_UP) + wake_on_ext0(pin=pin_switch, level=Pin.WAKE_LOW) + + if wake_reason() == 0 and not config.get("config_button"): # Power on - Run on boot + sleep(10) # Give 10 seconds to cancel from serial + deepsleep() + elif wake_reason() == 0 and config.get("config_button"): # Power on - Wait for button before starting + pin_config = Pin(config["config_button"], Pin.IN, Pin.PULL_UP) + while True: + print('Waiting for button...') + if not pin_config.value(): + if config.get("config_led"): + led_blink(config["config_led"], 2, interval=0.2) + deepsleep() + sleep(1) + elif wake_reason() == 1: # Sensor triggered alarm (woke from deep sleep) + import alarm + import wifi + + wifi.connect() + alarm.trigger_alert(config["device_name"], phue=config.get("phue"), smtp=config.get("smtp")) + deepsleep() + +try: + main() +except KeyboardInterrupt: + pass +finally: + led_blink(config["config_led"], 2, interval=0.2) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..07784bf --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +esptool +adafruit-ampy diff --git a/static/wemos-lolin-d32-magnetic_switch.fzz b/static/wemos-lolin-d32-magnetic_switch.fzz new file mode 100644 index 0000000..ccd901a Binary files /dev/null and b/static/wemos-lolin-d32-magnetic_switch.fzz differ diff --git a/static/wemos-lolin-d32-magnetic_switch.png b/static/wemos-lolin-d32-magnetic_switch.png new file mode 100644 index 0000000..b031060 Binary files /dev/null and b/static/wemos-lolin-d32-magnetic_switch.png differ