Browse Source

Initial commit

master
Ryan Reed 2 years ago
commit
0b8b7b9513
14 changed files with 1891 additions and 0 deletions
  1. +120
    -0
      .gitignore
  2. +179
    -0
      README.md
  3. +7
    -0
      boot.py
  4. +8
    -0
      config.py
  5. +156
    -0
      index.html
  6. +15
    -0
      library/leds.py
  7. +900
    -0
      library/microdot.py
  8. +329
    -0
      library/microdot_asyncio.py
  9. +47
    -0
      library/wifi.py
  10. +128
    -0
      main.py
  11. +2
    -0
      requirements.txt
  12. BIN
      static/wemos-lolin-d32.fzpz
  13. BIN
      static/wemos_lolin_d32-diagram.png
  14. BIN
      static/wemos_lolin_d32_pro-diagram.jpg

+ 120
- 0
.gitignore View File

@ -0,0 +1,120 @@
.ampy
# Phue Creds
bridge.dat
# Swap files
*.sw[a-p]
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/

+ 179
- 0
README.md View File

@ -0,0 +1,179 @@
# Micropython Ikea Lights
# Table of Contents
<!-- vim-markdown-toc GFM -->
* [Overview](#overview)
* [Notes On ESP32 and ESP8266 Boards](#notes-on-esp32-and-esp8266-boards)
* [Requirements](#requirements)
* [Hardware Requirements](#hardware-requirements)
* [Software Requirements](#software-requirements)
* [Schematic/Connection](#schematicconnection)
* [Quick Start](#quick-start)
* [API Endpoints](#api-endpoints)
* [/shutdown](#shutdown)
* [/status](#status)
* [/lights/on](#lightson)
* [/lights/off](#lightsoff)
* [/lights/rgb](#lightsrgb)
* [/lights/preset/bounce](#lightspresetbounce)
* [/lights/preset/cycle](#lightspresetcycle)
* [/lights/preset/niagra](#lightspresetniagra)
* [/lights/preset/rainbow](#lightspresetrainbow)
* [Hardware](#hardware)
* [LOLIN D32](#lolin-d32)
* [PIN Layout](#pin-layout)
* [Fritzing Part](#fritzing-part)
* [Erase Firmware Note](#erase-firmware-note)
<!-- vim-markdown-toc -->
# Overview
This project is looking at using an [Ikea Vidja](https://www.ikea.com/us/en/p/vidja-floor-lamp-with-led-bulb-white-30416149/) floor lamp with RGB LEDs controlled via an ESP32 or ESP8266 board.
It can be controlled through a webpage or through API GET requests, such as IOS Shortcuts
This was initially following along with [this](https://www.instructables.com/DIY-WiFi-RGB-Floor-Lamp-Ikea-Hack/). I started working on getting web calls working to the controller, using [TinyWeb](https://github.com/belyalov/tinyweb). This worked except I ran into issues related to stopping a currently running preset (e.g. running LED loop). Lo and behold, [Bhavesh](https://bhave.sh/micropython-microdot/) posted an article solving this exact issue by using [microdot](https://github.com/miguelgrinberg/microdot). So many thanks for getting me on the right path there.
# Notes On ESP32 and ESP8266 Boards
I have found that the LED lights can be controlled using the ESP8266 boards but not particularly well. A few issues I noticed:
* When cycling through colors, blue would end up becoming red while only displaying blue colors for very brief moments or not at all
* Noticeable flickering of the LEDs
* Slow transitioning
The first 2 issues were likely due to the ESP8266 boards only supplying 3.3v, instead of that 5V that can be supplied by the E32s. The last issue might be due to the processing power behind chips or the 5V as well.
Best to use an ESP32 boards and not an ESP8266, although an ESP8266 can sort of work.
# Requirements
## Hardware Requirements
* [Ikea Vidja](https://www.ikea.com/us/en/p/vidja-floor-lamp-with-led-bulb-white-30416149/) floor lamp
* ESP32 Development Board - See [My Hardware](#my-hardware) for info on my setup
* NeoPixel Like LEDs - I'm using [BTF-Lighting WS2812B](https://smile.amazon.com/dp/B01CDTEJBG) lights
* 300-500Ω Resistor
* 1000uF Capacitor
## Software Requirements
* [microdot](https://github.com/miguelgrinberg/microdot) - The underlying asyncio flask-like webserver (Both files from [/src](https://github.com/miguelgrinberg/microdot/tree/main/src))
* **IMPORTANT**: These are not included as we should make sure to get the latest version from pip or github above
# Schematic/Connection
This is pretty straight forward and I'll add a picture later at some point. But:
* Pin 23 -> 300-500Ω Resistor -> Data pin of LEDs
* On my LEDs, the data pin is the center line
* 1000uF Capacitor between positive and negative of LED strip
* [Adafruit Best Practices](https://learn.adafruit.com/adafruit-neopixel-uberguide/best-practices) notes this should be as close to the LEDs as possible. I have mine pushed into the connector itself, similar to the picture on the Best Practices page
# Quick Start
Modify the `config.py` file with wifi, pin, and pixel settings
```
esptool.py --port /dev/ttyUSB0 erase_flash
esptool.py --port /dev/ttyUSB0 --baud 460800 write_flash --flash_size=detect 0x1000 images/images/esp32-20220117-v1.18.bin
wget https://raw.githubusercontent.com/miguelgrinberg/microdot/main/src/microdot.py
wget https://raw.githubusercontent.com/miguelgrinberg/microdot/main/src/microdot_asyncio.py
ampy --port /dev/ttyUSB0 put microdot.py
ampy --port /dev/ttyUSB0 put microdot_asyncio.py
ampy --port /dev/ttyUSB0 put library/pixels.py
ampy --port /dev/ttyUSB0 put library/wifi.py
ampy --port /dev/ttyUSB0 put boot.py
ampy --port /dev/ttyUSB0 put config.py
ampy --port /dev/ttyUSB0 put main.py
ampy --port /dev/ttyUSB0 put index.html
# Reboot the microcontroller
```
At this point, you should be able to access the webpage on the esp32 at `http://IP/`
You can also use something like shortcuts to send GET requests to the various endpoints
# API Endpoints
## /shutdown
Shuts the microDot application down.
Note: It can only be brought back up by restarting the esp32 or through usb and `import main`
## /status
Provides memory and network information
## /lights/on
Sets rgb of the lights to 255, 255, 255
## /lights/off
Sets rgb of the lights to 0, 0, 0
## /lights/rgb
Allows for changing the color of the string of lights using RGB. Requires r,g,b arguments in the GET request
## /lights/preset/bounce
Accepts r,g,b arguments for the starting color
Light display sourced from [here](https://randomnerdtutorials.com/micropython-ws2812b-addressable-rgb-leds-neopixel-esp32-esp8266)
## /lights/preset/cycle
Accepts r,g,b arguments for the starting color
Light display sourced from [here](https://randomnerdtutorials.com/micropython-ws2812b-addressable-rgb-leds-neopixel-esp32-esp8266)
## /lights/preset/niagra
Light display sourced from [here](https://www.instructables.com/DIY-WiFi-RGB-Floor-Lamp-Ikea-Hack/)
## /lights/preset/rainbow
Light display sourced from [here](https://randomnerdtutorials.com/micropython-ws2812b-addressable-rgb-leds-neopixel-esp32-esp8266)
# Hardware
## LOLIN D32
I'm utilizing the Wemos [LOLIN D32 v1.0.0](https://www.aliexpress.com/item/WEMOS-LOLIN32-V1-0-0-wifi-bluetooth-board-based-ESP-32-4MB-FLASH/32808551116.html) however and ESP32 should work well here.
This is based on the ESP32 ([See datasheet](https://www.espressif.com/sites/default/files/documentation/esp32_datasheet_en.pdf))
| Description | Value |
| ------------------------- | -------------------------- |
| Microcontroller | ESP-32 |
| Board Power Supply (USB) | 5V |
| Supported Batteries | Lipo Battery 3.7V |
| Operating Voltage | 3.3V |
| Lithium battery interface | 500mA Max charging current |
| Digital I/O Pins | 22 |
| Analog Input Pins | 6 (VP, VN, 32, 33, 34, 35) |
| Analog Output Pins | 2 (25, 26) |
| LED BUILTIN | Pin 5 |
| Clock Speed(Max) | 240Mhz |
| Flash | 4M bytes |
| Length | 57mm |
| Width | 25.4mm |
| Weight | 6.1g |
### PIN Layout
![Wemos Lolin D32 Diagram](static/wemos_lolin_d32-diagram.png)
**[[Source](http://forum.hobbycomponents.com/viewtopic.php?f=111&t=2462)]**
### Fritzing Part
I created a [Fritzing](https://fritzing.com) part for the Wemos LOLIN D32.
[Download Here](static/wemos-lolin-d32.fzpz)
### Erase Firmware Note
If there is no boot/flash button and you get the error `Wrong boot mode detected (0x13)!` when attempting to erase the flash, try grounding Pin 0 and resetting.

+ 7
- 0
boot.py View File

@ -0,0 +1,7 @@
import gc
import wifi
status = wifi.connect()
print(f"Network Settings: {status}")
gc.collect() # Garbage collection

+ 8
- 0
config.py View File

@ -0,0 +1,8 @@
config = {}
config["wifi_ssid"] = ""
config["wifi_pass"] = ""
config["status_pin"] = 5
config["pixel_pin"] = 23
config["pixel_count"] = 70

+ 156
- 0
index.html View File

@ -0,0 +1,156 @@
<!DOCTYPE html>
<html>
<head>
<title>ESP32 Microdot Pixel Lights</title>
<meta charset="UTF-8">
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">
<script type="text/javascript">
var base_url = window.location.href.split("index.html")[0];
function http_get(url, callback) {
console.log("URL: " + url);
var xhr = new XMLHttpRequest();
xhr.addEventListener("load", function() {
callback(JSON.parse(xhr.response));
}, false);
xhr.open("GET", url);
xhr.send();
}
function show_alert(message) {
document.getElementById("alert").innerHTML = message;
document.getElementById("alert").style.display = "block";
}
function get_status() {
http_get(base_url + "status", function(response) {
document.getElementById("memory_total").innerHTML = response["memory"]["total"] + " B";
document.getElementById("memory_used").innerHTML = response["memory"]["alloc"] + " B";
document.getElementById("memory_free").innerHTML = response["memory"]["free"] + " B";
document.getElementById("network_ip").innerHTML = response["network"]["wifi"]["ip"];
document.getElementById("network_netmask").innerHTML = response["network"]["wifi"]["netmask"];
document.getElementById("network_gateway").innerHTML = response["network"]["wifi"]["gateway"];
document.getElementById("network_dns").innerHTML = response["network"]["wifi"]["dns"];
});
}
function run_light_preset(mode) {
http_get(base_url + "lights/preset/" + mode, function(response) {
show_alert(response["message"]);
});
}
function set_color() {
var r = document.getElementById("r").value;
var g = document.getElementById("g").value;
var b = document.getElementById("b").value;
var url = base_url + "lights/" + mode + "?r= " + r + "&g=" + g + "&b=" + b
http_get(url , function(response) {
show_alert(response["message"]);
});
}
function shutdown() {
show_alert("Server is shutting down");
http_get(base_url + "shutdown");
}
function toggle_lights(mode) {
http_get(base_url + "lights/" + mode, function(response) {
show_alert(response["message"]);
});
}
</script>
<style>
#status, #lights, #rgb { margin-bottom: 10px; }
</style>
</head>
<body onload="get_status();">
<div class="container-fluid">
<h1>ESP32 Microdot Pixel Lights</h1>
<div id="alert" class="alert alert-primary" role="alert" style="display: none;"></div>
<div id="status" class="row">
<div class="col">
<table class="table table-hover">
<thead>
<tr><th colspan="100%" style="text-align: center;">Network Settings</th></tr>
<tr><th>IP</th><th>Netmask</th><th>Gateway</th><th>DNS</th></tr>
</thead>
<tbody>
<tr>
<td id="network_ip"></td>
<td id="network_netmask"></td>
<td id="network_gateway"></td>
<td id="network_dns"></td>
</tr>
</tbody>
</table>
</div>
<div class="col">
<table class="table table-hover">
<thead>
<tr><th colspan="100%" style="text-align: center;">Controller Memory</th></tr>
<tr><th>Total</th><th>Used</th><th>Free</th></tr>
</thead>
<tbody>
<tr>
<td id="memory_total"></td>
<td id="memory_used"></td>
<td id="memory_free"></td>
</tr>
</body>
</table>
</div>
</div>
<div id="lights">
<p>
<h2>Pixel Lights</h2>
<button type="button" class="btn btn-lg btn-block btn-outline-success" onclick="toggle_lights('on');">💡 Light On</button>
<button type="button" class="btn btn-lg btn-block btn-outline-secondary" onclick="toggle_lights('off');">💡 Light Off</button>
<button type="button" class="btn btn-lg btn-block btn-outline-primary" onclick="set_color();">🟠 Set Color</button>
</p>
<p>
<h2>Preset Presets</h2>
<button type="button" class="btn btn-lg btn-block btn-outline-info" onclick="run_light_preset('bounce');">🏀 Bounce</button>
<button type="button" class="btn btn-lg btn-block btn-outline-info" onclick="run_light_preset('cycle');">🌀 Cycle</button>
<button type="button" class="btn btn-lg btn-block btn-outline-info" onclick="run_light_preset('niagra');">🌊 Niagra</button>
<button type="button" class="btn btn-lg btn-block btn-outline-info" onclick="run_light_preset('rainbow');">🌈 Rainbow</button>
</p>
</div>
<div id="rgb" class="form-row form-group">
<label class="col-form-label" for="r">R</label>
<div class="form-group col-sm-1">
<input name="r" id="r" class="form-control" type="text" placeholder="0" value="0" />
</div>
<label class="col-form-label" for="g">G</label>
<div class="form-group col-sm-1">
<input name="g" id="g" class="form-control" type="text" placeholder="0" value="0" />
</div>
<label class="col-form-label" for="b">B</label>
<div class="form-group col-sm-1">
<input name="b" id="b" class="form-control" type="text" placeholder="255" value="255" />
</div>
<small id="rgb_help" class="form-text text-muted">Used for a few presets or set color</small>
</div>
<hr>
<div id="shutdown" style="margin-top: 10px;">
<button type="button" class="btn btn-outline-danger btn-lg btn-block" onclick="shutdown();">🚫 Shutdown App</button>
<p style="margin: 10px;">Please Note: Shutting down the server will require restart of the microcontroller</p>
</div>
</div>
</body>
</html>

+ 15
- 0
library/leds.py View File

@ -0,0 +1,15 @@
from machine import Pin
from time import sleep
def blink(led_pin, count, delay=0.5):
led = Pin(led_pin, Pin.OUT) # Usually GPIO2
led.value(1) # Start off
x = 0
while x < count*2:
led.value(not led.value())
sleep(delay)
x = x + 1
def test(led_pin=5):
led_blink(led_pin, 3, delay=0.2)

+ 900
- 0
library/microdot.py View File

@ -0,0 +1,900 @@
"""
microdot
--------
The ``microdot`` module defines a few classes that help implement HTTP-based
servers for MicroPython and standard Python, with multithreading support for
Python interpreters that support it.
"""
try:
from sys import print_exception
except ImportError: # pragma: no cover
import traceback
def print_exception(exc):
traceback.print_exc()
try:
import uerrno as errno
except ImportError:
import errno
concurrency_mode = 'threaded'
try: # pragma: no cover
import threading
def create_thread(f, *args, **kwargs):
# use the threading module
threading.Thread(target=f, args=args, kwargs=kwargs).start()
except ImportError: # pragma: no cover
try:
import _thread
def create_thread(f, *args, **kwargs):
# use MicroPython's _thread module
def run():
f(*args, **kwargs)
_thread.start_new_thread(run, ())
except ImportError:
def create_thread(f, *args, **kwargs):
# no threads available, call function synchronously
f(*args, **kwargs)
concurrency_mode = 'sync'
try:
import ujson as json
except ImportError:
import json
try:
import ure as re
except ImportError:
import re
try:
import usocket as socket
except ImportError:
try:
import socket
except ImportError: # pragma: no cover
socket = None
def urldecode(string):
string = string.replace('+', ' ')
parts = string.split('%')
if len(parts) == 1:
return string
result = [parts[0]]
for item in parts[1:]:
if item == '':
result.append('%')
else:
code = item[:2]
result.append(chr(int(code, 16)))
result.append(item[2:])
return ''.join(result)
class MultiDict(dict):
"""A subclass of dictionary that can hold multiple values for the same
key. It is used to hold key/value pairs decoded from query strings and
form submissions.
:param initial_dict: an initial dictionary of key/value pairs to
initialize this object with.
Example::
>>> d = MultiDict()
>>> d['sort'] = 'name'
>>> d['sort'] = 'email'
>>> print(d['sort'])
'name'
>>> print(d.getlist('sort'))
['name', 'email']
"""
def __init__(self, initial_dict=None):
super().__init__()
if initial_dict:
for key, value in initial_dict.items():
self[key] = value
def __setitem__(self, key, value):
if key not in self:
super().__setitem__(key, [])
super().__getitem__(key).append(value)
def __getitem__(self, key):
return super().__getitem__(key)[0]
def get(self, key, default=None, type=None):
"""Return the value for a given key.
:param key: The key to retrieve.
:param default: A default value to use if the key does not exist.
:param type: A type conversion callable to apply to the value.
If the multidict contains more than one value for the requested key,
this method returns the first value only.
Example::
>>> d = MultiDict()
>>> d['age'] = '42'
>>> d.get('age')
'42'
>>> d.get('age', type=int)
42
>>> d.get('name', default='noname')
'noname'
"""
if key not in self:
return default
value = self[key]
if type is not None:
value = type(value)
return value
def getlist(self, key, type=None):
"""Return all the values for a given key.
:param key: The key to retrieve.
:param type: A type conversion callable to apply to the values.
If the requested key does not exist in the dictionary, this method
returns an empty list.
Example::
>>> d = MultiDict()
>>> d.getlist('items')
[]
>>> d['items'] = '3'
>>> d.getlist('items')
['3']
>>> d['items'] = '56'
>>> d.getlist('items')
['3', '56']
>>> d.getlist('items', type=int)
[3, 56]
"""
if key not in self:
return []
values = super().__getitem__(key)
if type is not None:
values = [type(value) for value in values]
return values
class Request():
"""An HTTP request class.
:var app: The application instance to which this request belongs.
:var client_addr: The address of the client, as a tuple (host, port).
:var method: The HTTP method of the request.
:var path: The path portion of the URL.
:var query_string: The query string portion of the URL.
:var args: The parsed query string, as a :class:`MultiDict` object.
:var headers: A dictionary with the headers included in the request.
:var cookies: A dictionary with the cookies included in the request.
:var content_length: The parsed ``Content-Length`` header.
:var content_type: The parsed ``Content-Type`` header.
:var stream: The input stream, containing the request body.
:var body: The body of the request, as bytes.
:var json: The parsed JSON body, as a dictionary or list, or ``None`` if
the request does not have a JSON body.
:var form: The parsed form submission body, as a :class:`MultiDict` object,
or ``None`` if the request does not have a form submission.
:var g: A general purpose container for applications to store data during
the life of the request.
"""
#: Specify the maximum payload size that is accepted. Requests with larger
#: payloads will be rejected with a 413 status code. Applications can
#: change this maximum as necessary.
#:
#: Example::
#:
#: Request.max_content_length = 1 * 1024 * 1024 # 1MB requests allowed
max_content_length = 16 * 1024
#: Specify the maximum payload size that can be stored in ``body``.
#: Requests with payloads that are larger than this size and up to
#: ``max_content_length`` bytes will be accepted, but the application will
#: only be able to access the body of the request by reading from
#: ``stream``. Set to 0 if you always access the body as a stream.
#:
#: Example::
#:
#: Request.max_body_length = 4 * 1024 # up to 4KB bodies read
max_body_length = 16 * 1024
#: Specify the maximum length allowed for a line in the request. Requests
#: with longer lines will not be correctly interpreted. Applications can
#: change this maximum as necessary.
#:
#: Example::
#:
#: Request.max_readline = 16 * 1024 # 16KB lines allowed
max_readline = 2 * 1024
class G:
pass
def __init__(self, app, client_addr, method, url, http_version, headers,
body=None, stream=None):
self.app = app
self.client_addr = client_addr
self.method = method
self.path = url
self.http_version = http_version
if '?' in self.path:
self.path, self.query_string = self.path.split('?', 1)
self.args = self._parse_urlencoded(self.query_string)
else:
self.query_string = None
self.args = {}
self.headers = headers
self.cookies = {}
self.content_length = 0
self.content_type = None
for header, value in self.headers.items():
header = header.lower()
if header == 'content-length':
self.content_length = int(value)
elif header == 'content-type':
self.content_type = value
elif header == 'cookie':
for cookie in value.split(';'):
name, value = cookie.strip().split('=', 1)
self.cookies[name] = value
self._body = body
self.body_used = False
self._stream = stream
self.stream_used = False
self._json = None
self._form = None
self.g = Request.G()
@staticmethod
def create(app, client_stream, client_addr):
"""Create a request object.
:param app: The Microdot application instance.
:param client_stream: An input stream from where the request data can
be read.
:param client_addr: The address of the client, as a tuple.
This method returns a newly created ``Request`` object.
"""
# request line
line = Request._safe_readline(client_stream).strip().decode()
if not line:
return None
method, url, http_version = line.split()
http_version = http_version.split('/', 1)[1]
# headers
headers = {}
while True:
line = Request._safe_readline(client_stream).strip().decode()
if line == '':
break
header, value = line.split(':', 1)
value = value.strip()
headers[header] = value
return Request(app, client_addr, method, url, http_version, headers,
stream=client_stream)
def _parse_urlencoded(self, urlencoded):
data = MultiDict()
for k, v in [pair.split('=', 1) for pair in urlencoded.split('&')]:
data[urldecode(k)] = urldecode(v)
return data
@property
def body(self):
if self.stream_used:
raise RuntimeError('Cannot use both stream and body')
if self._body is None:
self._body = b''
if self.content_length and \
self.content_length <= Request.max_body_length:
while len(self._body) < self.content_length:
data = self._stream.read(
self.content_length - len(self._body))
if len(data) == 0: # pragma: no cover
raise EOFError()
self._body += data
self.body_used = True
return self._body
@property
def stream(self):
if self.body_used:
raise RuntimeError('Cannot use both stream and body')
self.stream_used = True
return self._stream
@property
def json(self):
if self._json is None:
if self.content_type is None:
return None
mime_type = self.content_type.split(';')[0]
if mime_type != 'application/json':
return None
self._json = json.loads(self.body.decode())
return self._json
@property
def form(self):
if self._form is None:
if self.content_type is None:
return None
mime_type = self.content_type.split(';')[0]
if mime_type != 'application/x-www-form-urlencoded':
return None
self._form = self._parse_urlencoded(self.body.decode())
return self._form
@staticmethod
def _safe_readline(stream):
line = stream.readline(Request.max_readline + 1)
if len(line) > Request.max_readline:
raise ValueError('line too long')
return line
class Response():
"""An HTTP response class.
:param body: The body of the response. If a dictionary or list is given,
a JSON formatter is used to generate the body.
:param status_code: The numeric HTTP status code of the response. The
default is 200.
:param headers: A dictionary of headers to include in the response.
:param reason: A custom reason phrase to add after the status code. The
default is "OK" for responses with a 200 status code and
"N/A" for any other status codes.
"""
types_map = {
'css': 'text/css',
'gif': 'image/gif',
'html': 'text/html',
'jpg': 'image/jpeg',
'js': 'application/javascript',
'json': 'application/json',
'png': 'image/png',
'txt': 'text/plain',
}
send_file_buffer_size = 1024
def __init__(self, body='', status_code=200, headers=None, reason=None):
self.status_code = status_code
self.headers = headers.copy() if headers else {}
self.reason = reason
if isinstance(body, (dict, list)):
self.body = json.dumps(body).encode()
self.headers['Content-Type'] = 'application/json'
elif isinstance(body, str):
self.body = body.encode()
else:
# this applies to bytes or file-like objects
self.body = body
def set_cookie(self, cookie, value, path=None, domain=None, expires=None,
max_age=None, secure=False, http_only=False):
"""Add a cookie to the response.
:param cookie: The cookie's name.
:param value: The cookie's value.
:param path: The cookie's path.
:param domain: The cookie's domain.
:param expires: The cookie expiration time, as a ``datetime`` object.
:param max_age: The cookie's ``Max-Age`` value.
:param secure: The cookie's ``secure`` flag.
:param http_only: The cookie's ``HttpOnly`` flag.
"""
http_cookie = '{cookie}={value}'.format(cookie=cookie, value=value)
if path:
http_cookie += '; Path=' + path
if domain:
http_cookie += '; Domain=' + domain
if expires:
http_cookie += '; Expires=' + expires.strftime(
"%a, %d %b %Y %H:%M:%S GMT")
if max_age:
http_cookie += '; Max-Age=' + str(max_age)
if secure:
http_cookie += '; Secure'
if http_only:
http_cookie += '; HttpOnly'
if 'Set-Cookie' in self.headers:
self.headers['Set-Cookie'].append(http_cookie)
else:
self.headers['Set-Cookie'] = [http_cookie]
def complete(self):
if isinstance(self.body, bytes) and \
'Content-Length' not in self.headers:
self.headers['Content-Length'] = str(len(self.body))
if 'Content-Type' not in self.headers:
self.headers['Content-Type'] = 'text/plain'
def write(self, stream):
self.complete()
# status code
reason = self.reason if self.reason is not None else \
('OK' if self.status_code == 200 else 'N/A')
stream.write('HTTP/1.0 {status_code} {reason}\r\n'.format(
status_code=self.status_code, reason=reason).encode())
# headers
for header, value in self.headers.items():
values = value if isinstance(value, list) else [value]
for value in values:
stream.write('{header}: {value}\r\n'.format(
header=header, value=value).encode())
stream.write(b'\r\n')
# body
if self.body:
if hasattr(self.body, 'read'):
while True:
buf = self.body.read(self.send_file_buffer_size)
if len(buf):
stream.write(buf)
if len(buf) < self.send_file_buffer_size:
break
if hasattr(self.body, 'close'): # pragma: no cover
self.body.close()
else:
stream.write(self.body)
@classmethod
def redirect(cls, location, status_code=302):
"""Return a redirect response.
:param location: The URL to redirect to.
:param status_code: The 3xx status code to use for the redirect. The
default is 302.
"""
if '\x0d' in location or '\x0a' in location:
raise ValueError('invalid redirect URL')
return cls(status_code=status_code, headers={'Location': location})
@classmethod
def send_file(cls, filename, status_code=200, content_type=None):
"""Send file contents in a response.
:param filename: The filename of the file.
:param status_code: The 3xx status code to use for the redirect. The
default is 302.
:param content_type: The ``Content-Type`` header to use in the
response. If omitted, it is generated
automatically from the file extension.
Security note: The filename is assumed to be trusted. Never pass
filenames provided by the user before validating and sanitizing them
first.
"""
if content_type is None:
ext = filename.split('.')[-1]
if ext in Response.types_map:
content_type = Response.types_map[ext]
else:
content_type = 'application/octet-stream'
f = open(filename, 'rb')
return cls(body=f, status_code=status_code,
headers={'Content-Type': content_type})
class URLPattern():
def __init__(self, url_pattern):
self.pattern = ''
self.args = []
use_regex = False
for segment in url_pattern.lstrip('/').split('/'):
if segment and segment[0] == '<':
if segment[-1] != '>':
raise ValueError('invalid URL pattern')
segment = segment[1:-1]
if ':' in segment:
type_, name = segment.rsplit(':', 1)
else:
type_ = 'string'
name = segment
if type_ == 'string':
pattern = '[^/]+'
elif type_ == 'int':
pattern = '\\d+'
elif type_ == 'path':
pattern = '.+'
elif type_.startswith('re:'):
pattern = type_[3:]
else:
raise ValueError('invalid URL segment type')
use_regex = True
self.pattern += '/({pattern})'.format(pattern=pattern)
self.args.append({'type': type_, 'name': name})
else:
self.pattern += '/{segment}'.format(segment=segment)
if use_regex:
self.pattern = re.compile('^' + self.pattern + '$')
def match(self, path):
if isinstance(self.pattern, str):
if path != self.pattern:
return
return {}
g = self.pattern.match(path)
if not g:
return
args = {}
i = 1
for arg in self.args:
value = g.group(i)
if arg['type'] == 'int':
value = int(value)
args[arg['name']] = value
i += 1
return args
class Microdot():
"""An HTTP application class.
This class implements an HTTP application instance and is heavily
influenced by the ``Flask`` class of the Flask framework. It is typically
declared near the start of the main application script.
Example::
from microdot import Microdot
app = Microdot()
"""
def __init__(self):
self.url_map = []
self.before_request_handlers = []
self.after_request_handlers = []
self.error_handlers = {}
self.shutdown_requested = False
self.debug = False
self.server = None
def route(self, url_pattern, methods=None):
"""Decorator that is used to register a function as a request handler
for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
:param methods: The list of HTTP methods to be handled by the
decorated function. If omitted, only ``GET`` requests
are handled.
The URL pattern can be a static path (for example, ``/users`` or
``/api/invoices/search``) or a path with dynamic components enclosed
in ``<`` and ``>`` (for example, ``/users/<id>`` or
``/invoices/<number>/products``). Dynamic path components can also
include a type prefix, separated from the name with a colon (for
example, ``/users/<int:id>``). The type can be ``string`` (the
default), ``int``, ``path`` or ``re:[regular-expression]``.
The first argument of the decorated function must be
the request object. Any path arguments that are specified in the URL
pattern are passed as keyword arguments. The return value of the
function must be a :class:`Response` instance, or the arguments to
be passed to this class.
Example::
@app.route('/')
def index(request):
return 'Hello, world!'
"""
def decorated(f):
self.url_map.append(
(methods or ['GET'], URLPattern(url_pattern), f))
return f
return decorated
def get(self, url_pattern):
"""Decorator that is used to register a function as a ``GET`` request
handler for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
This decorator can be used as an alias to the ``route`` decorator with
``methods=['GET']``.
Example::
@app.get('/users/<int:id>')
def get_user(request, id):
# ...
"""
return self.route(url_pattern, methods=['GET'])
def post(self, url_pattern):
"""Decorator that is used to register a function as a ``POST`` request
handler for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
This decorator can be used as an alias to the``route`` decorator with
``methods=['POST']``.
Example::
@app.post('/users')
def create_user(request):
# ...
"""
return self.route(url_pattern, methods=['POST'])
def put(self, url_pattern):
"""Decorator that is used to register a function as a ``PUT`` request
handler for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
This decorator can be used as an alias to the ``route`` decorator with
``methods=['PUT']``.
Example::
@app.put('/users/<int:id>')
def edit_user(request, id):
# ...
"""
return self.route(url_pattern, methods=['PUT'])
def patch(self, url_pattern):
"""Decorator that is used to register a function as a ``PATCH`` request
handler for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
This decorator can be used as an alias to the ``route`` decorator with
``methods=['PATCH']``.
Example::
@app.patch('/users/<int:id>')
def edit_user(request, id):
# ...
"""
return self.route(url_pattern, methods=['PATCH'])
def delete(self, url_pattern):
"""Decorator that is used to register a function as a ``DELETE``
request handler for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
This decorator can be used as an alias to the ``route`` decorator with
``methods=['DELETE']``.
Example::
@app.delete('/users/<int:id>')
def delete_user(request, id):
# ...
"""
return self.route(url_pattern, methods=['DELETE'])
def before_request(self, f):
"""Decorator to register a function to run before each request is
handled. The decorated function must take a single argument, the
request object.
Example::
@app.before_request
def func(request):
# ...
"""
self.before_request_handlers.append(f)
return f
def after_request(self, f):
"""Decorator to register a function to run after each request is
handled. The decorated function must take two arguments, the request
and response objects. The return value of the function must be an
updated response object.
Example::
@app.before_request
def func(request, response):
# ...
"""
self.after_request_handlers.append(f)
return f
def errorhandler(self, status_code_or_exception_class):
"""Decorator to register a function as an error handler. Error handler
functions for numeric HTTP status codes must accept a single argument,
the request object. Error handler functions for Python exceptions
must accept two arguments, the request object and the exception
object.
:param status_code_or_exception_class: The numeric HTTP status code or
Python exception class to
handle.
Examples::
@app.errorhandler(404)
def not_found(request):
return 'Not found'
@app.errorhandler(RuntimeError)
def runtime_error(request, exception):
return 'Runtime error'
"""
def decorated(f):
self.error_handlers[status_code_or_exception_class] = f
return f
return decorated
def run(self, host='0.0.0.0', port=5000, debug=False):
"""Start the web server. This function does not normally return, as
the server enters an endless listening loop. The :func:`shutdown`
function provides a method for terminating the server gracefully.
:param host: The hostname or IP address of the network interface that
will be listening for requests. A value of ``'0.0.0.0'``
(the default) indicates that the server should listen for
requests on all the available interfaces, and a value of
``127.0.0.1`` indicates that the server should listen
for requests only on the internal networking interface of
the host.
:param port: The port number to listen for requests. The default is
port 5000.
:param debug: If ``True``, the server logs debugging information. The
default is ``False``.
Example::
from microdot import Microdot
app = Microdot()
@app.route('/')
def index():
return 'Hello, world!'
app.run(debug=True)
"""
self.debug = debug
self.shutdown_requested = False
self.server = socket.socket()
ai = socket.getaddrinfo(host, port)
addr = ai[0][-1]
if self.debug: # pragma: no cover
print('Starting {mode} server on {host}:{port}...'.format(
mode=concurrency_mode, host=host, port=port))
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind(addr)
self.server.listen(5)
while not self.shutdown_requested:
try:
sock, addr = self.server.accept()
except OSError as exc: # pragma: no cover
if exc.args[0] == errno.ECONNABORTED:
break
else:
raise
create_thread(self.dispatch_request, sock, addr)
def shutdown(self):
"""Request a server shutdown. The server will then exit its request
listening loop and the :func:`run` function will return. This function
can be safely called from a route handler, as it only schedules the
server to terminate as soon as the request completes.
Example::
@app.route('/shutdown')
def shutdown(request):
request.app.shutdown()
return 'The server is shutting down...'
"""
self.shutdown_requested = True
def find_route(self, req):
f = None
for route_methods, route_pattern, route_handler in self.url_map:
if req.method in route_methods:
req.url_args = route_pattern.match(req.path)
if req.url_args is not None:
f = route_handler
break
return f
def dispatch_request(self, sock, addr):
if not hasattr(sock, 'readline'): # pragma: no cover
stream = sock.makefile("rwb")
else:
stream = sock
req = None
try:
req = Request.create(self, stream, addr)
except Exception as exc: # pragma: no cover
print_exception(exc)
if req:
if req.content_length > req.max_content_length:
if 413 in self.error_handlers:
res = self.error_handlers[413](req)
else:
res = 'Payload too large', 413
else:
f = self.find_route(req)
try:
res = None
if f:
for handler in self.before_request_handlers:
res = handler(req)
if res:
break
if res is None:
res = f(req, **req.url_args)
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
for handler in self.after_request_handlers:
res = handler(req, res) or res
elif 404 in self.error_handlers:
res = self.error_handlers[404](req)
else:
res = 'Not found', 404
except Exception as exc:
print_exception(exc)
res = None
if exc.__class__ in self.error_handlers:
try:
res = self.error_handlers[exc.__class__](req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:
if 500 in self.error_handlers:
res = self.error_handlers[500](req)
else:
res = 'Internal server error', 500
else:
res = 'Bad request', 400
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
res.write(stream)
stream.close()
if stream != sock: # pragma: no cover
sock.close()
if self.shutdown_requested: # pragma: no cover
self.server.close()
if self.debug and req: # pragma: no cover
print('{method} {path} {status_code}'.format(
method=req.method, path=req.path,
status_code=res.status_code))
redirect = Response.redirect
send_file = Response.send_file

+ 329
- 0
library/microdot_asyncio.py View File

@ -0,0 +1,329 @@
"""
microdot_asyncio
----------------
The ``microdot_asyncio`` module defines a few classes that help implement
HTTP-based servers for MicroPython and standard Python that use ``asyncio``
and coroutines.
"""
try:
import uasyncio as asyncio
except ImportError:
import asyncio
try:
import uio as io
except ImportError:
import io
from microdot import Microdot as BaseMicrodot
from microdot import print_exception
from microdot import Request as BaseRequest
from microdot import Response as BaseResponse
def _iscoroutine(coro):
return hasattr(coro, 'send') and hasattr(coro, 'throw')
class _AsyncBytesIO:
def __init__(self, data):
self.stream = io.BytesIO(data)
async def read(self, n=-1):
return self.stream.read(n)
async def readline(self): # pragma: no cover
return self.stream.readline()
async def readexactly(self, n): # pragma: no cover
return self.stream.read(n)
async def readuntil(self, separator=b'\n'): # pragma: no cover
return self.stream.readuntil(separator=separator)
class Request(BaseRequest):
@staticmethod
async def create(app, client_stream, client_addr):
"""Create a request object.
:param app: The Microdot application instance.
:param client_stream: An input stream from where the request data can
be read.
:param client_addr: The address of the client, as a tuple.
This method is a coroutine. It returns a newly created ``Request``
object.
"""
# request line
line = (await Request._safe_readline(client_stream)).strip().decode()
if not line: # pragma: no cover
return None
method, url, http_version = line.split()
http_version = http_version.split('/', 1)[1]
# headers
headers = {}
content_length = 0
while True:
line = (await Request._safe_readline(
client_stream)).strip().decode()
if line == '':
break
header, value = line.split(':', 1)
value = value.strip()
headers[header] = value
if header.lower() == 'content-length':
content_length = int(value)
# body
body = b''
print(Request.max_body_length)
if content_length and content_length <= Request.max_body_length:
body = await client_stream.readexactly(content_length)
stream = None
else:
body = b''
stream = client_stream
return Request(app, client_addr, method, url, http_version, headers,
body=body, stream=stream)
@property
def stream(self):
if self._stream is None:
self._stream = _AsyncBytesIO(self._body)
return self._stream
@staticmethod
async def _safe_readline(stream):
line = (await stream.readline())
if len(line) > Request.max_readline:
raise ValueError('line too long')
return line
class Response(BaseResponse):
"""An HTTP response class.
:param body: The body of the response. If a dictionary or list is given,
a JSON formatter is used to generate the body.
:param status_code: The numeric HTTP status code of the response. The
default is 200.
:param headers: A dictionary of headers to include in the response.
:param reason: A custom reason phrase to add after the status code. The
default is "OK" for responses with a 200 status code and
"N/A" for any other status codes.
"""
async def write(self, stream):
self.complete()
# status code
reason = self.reason if self.reason is not None else \
('OK' if self.status_code == 200 else 'N/A')
await stream.awrite('HTTP/1.0 {status_code} {reason}\r\n'.format(
status_code=self.status_code, reason=reason).encode())
# headers
for header, value in self.headers.items():
values = value if isinstance(value, list) else [value]
for value in values:
await stream.awrite('{header}: {value}\r\n'.format(
header=header, value=value).encode())
await stream.awrite(b'\r\n')
# body
if self.body:
if hasattr(self.body, 'read'):
while True:
buf = self.body.read(self.send_file_buffer_size)
if len(buf):
await stream.awrite(buf)
if len(buf) < self.send_file_buffer_size:
break
if hasattr(self.body, 'close'): # pragma: no cover
self.body.close()
else:
await stream.awrite(self.body)
class Microdot(BaseMicrodot):
async def start_server(self, host='0.0.0.0', port=5000, debug=False):
"""Start the Microdot web server as a coroutine. This coroutine does
not normally return, as the server enters an endless listening loop.
The :func:`shutdown` function provides a method for terminating the
server gracefully.
:param host: The hostname or IP address of the network interface that
will be listening for requests. A value of ``'0.0.0.0'``
(the default) indicates that the server should listen for
requests on all the available interfaces, and a value of
``127.0.0.1`` indicates that the server should listen
for requests only on the internal networking interface of
the host.
:param port: The port number to listen for requests. The default is
port 5000.
:param debug: If ``True``, the server logs debugging information. The
default is ``False``.
This method is a coroutine.
Example::
import asyncio
from microdot_asyncio import Microdot
app = Microdot()
@app.route('/')
async def index():
return 'Hello, world!'
async def main():
await app.start_server(debug=True)
asyncio.run(main())
"""
self.debug = debug
async def serve(reader, writer):
if not hasattr(writer, 'awrite'): # pragma: no cover
# CPython provides the awrite and aclose methods in 3.8+
async def awrite(self, data):
self.write(data)
await self.drain()
async def aclose(self):
self.close()
await self.wait_closed()
from types import MethodType
writer.awrite = MethodType(awrite, writer)
writer.aclose = MethodType(aclose, writer)
await self.dispatch_request(reader, writer)
if self.debug: # pragma: no cover
print('Starting async server on {host}:{port}...'.format(
host=host, port=port))
self.server = await asyncio.start_server(serve, host, port)
while True:
try:
await self.server.wait_closed()
break
except AttributeError: # pragma: no cover
# the task hasn't been initialized in the server object yet
# wait a bit and try again
await asyncio.sleep(0.1)
def run(self, host='0.0.0.0', port=5000, debug=False):
"""Start the web server. This function does not normally return, as
the server enters an endless listening loop. The :func:`shutdown`
function provides a method for terminating the server gracefully.
:param host: The hostname or IP address of the network interface that
will be listening for requests. A value of ``'0.0.0.0'``
(the default) indicates that the server should listen for
requests on all the available interfaces, and a value of
``127.0.0.1`` indicates that the server should listen
for requests only on the internal networking interface of
the host.
:param port: The port number to listen for requests. The default is
port 5000.
:param debug: If ``True``, the server logs debugging information. The
default is ``False``.
Example::
from microdot_asyncio import Microdot
app = Microdot()
@app.route('/')
async def index():
return 'Hello, world!'
app.run(debug=True)
"""
asyncio.run(self.start_server(host=host, port=port, debug=debug))
def shutdown(self):
self.server.close()
async def dispatch_request(self, reader, writer):
req = None
try:
req = await Request.create(self, reader,
writer.get_extra_info('peername'))
except Exception as exc: # pragma: no cover
print_exception(exc)
if req:
if req.content_length > req.max_content_length:
if 413 in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[413], req)
else:
res = 'Payload too large', 413
else:
f = self.find_route(req)
try:
res = None
if f:
for handler in self.before_request_handlers:
res = await self._invoke_handler(handler, req)
if res:
break
if res is None:
res = await self._invoke_handler(
f, req, **req.url_args)
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
for handler in self.after_request_handlers:
res = await self._invoke_handler(
handler, req, res) or res
elif 404 in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[404], req)
else:
res = 'Not found', 404
except Exception as exc:
print_exception(exc)
res = None
if exc.__class__ in self.error_handlers:
try:
res = await self._invoke_handler(
self.error_handlers[exc.__class__], req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:
if 500 in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[500], req)
else:
res = 'Internal server error', 500
else:
res = 'Bad request', 400
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
await res.write(writer)
await writer.aclose()
if self.debug and req: # pragma: no cover
print('{method} {path} {status_code}'.format(
method=req.method, path=req.path,
status_code=res.status_code))
async def _invoke_handler(self, f_or_coro, *args, **kwargs):
ret = f_or_coro(*args, **kwargs)
if _iscoroutine(ret):
ret = await ret
return ret
redirect = Response.redirect
send_file = Response.send_file

+ 47
- 0
library/wifi.py View File

@ -0,0 +1,47 @@
import network
try:
from config import config
except ImportError:
config = {}
ap_if = network.WLAN(network.AP_IF)
wifi = network.WLAN(network.STA_IF)
def connect(enable_ap=False):
ap_if.active(enable_ap)
if not config.get("wifi_ssid") or not config.get("wifi_pass"):
print("WARNING: wifi_ssid or wifi_pass not configured in config.py")
return status()
if not wifi.isconnected():
wifi.active(True)
wifi.connect(config["wifi_ssid"], config["wifi_pass"])
while not wifi.isconnected():
pass
return status()
def disconnect():
wifi = network.WLAN(network.STA_IF)
wifi.active(False)
def status():
results = {
"ap": ap_if.ifconfig()[0] != "0.0.0.0",
"wifi": {
"ip": "0.0.0.0",
"netmask": "0.0.0.0",
"gateway": "0.0.0.0",
"dns": "0.0.0.0",
}
}
if wifi.isconnected():
ifconfig = wifi.ifconfig()
results["wifi"] = {
"ip": ifconfig[0],
"netmask": ifconfig[1],
"gateway": ifconfig[2],
"dns": ifconfig[3],
}
return results

+ 128
- 0
main.py View File

@ -0,0 +1,128 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
import gc
from machine import Pin
from microdot_asyncio import Microdot, Response, send_file
from pixels import Pixels
from wifi import status as network_status
try:
from config import config
except ImportError:
raise Exception("Cannot open/find config.py")
app = Microdot()
current_task = None
try:
p = Pixels(config["pixel_pin"], config["pixel_count"])
except KeyError:
raise ValueError("'pixel_pin' or 'pixel_count' does not exist in config.py")
@app.before_request
async def pre_request_handler(request):
# Cancel any running preset if light related function is called
if request.path.startswith("/lights/") and current_task:
current_task.cancel()
@app.route("/")
async def hello(request):
return send_file("index.html")
@app.route("/shutdown")
async def shutdown(request):
print("Shutting Down App")
if current_task:
current_task.cancel()
request.app.shutdown()
gc.collect()
return {"message": "The server is shutting down..."}
@app.route("/status")
async def status(request):
results = {
"memory": {
"alloc": gc.mem_alloc(),
"free": gc.mem_free(),
"total": gc.mem_alloc() + gc.mem_free(),
},
"network": network_status(),
}
return results
@app.route("/lights/off")
async def lights_off(request):
p.clear()
return {"message": "Lights turned off"}
@app.route("/lights/on")
async def lights_on(request):
p.set_color(r=255, g=255, b=255)
return {"message": "Lights turned on"}
@app.route("/lights/rgb")
async def lights_rgb(request):
args = rgb_args(request.args)
p.set_color(**args)
return {"message": f"Light colors set: {args}"}
@app.route("/lights/preset/bounce")
async def lights_preset_bounce(request):
global current_task
args = rgb_args(request.args)
args["cycles"] = request.args.get("cycles", -1)
current_task = asyncio.create_task(p.preset_bounce(**args))
return {"message": "Running light preset: Bounce"}
@app.route("/lights/preset/cycle")
async def lights_preset_cycle(request):
global current_task
args = rgb_args(request.args)
args["cycles"] = request.args.get("cycles", -1)
current_task = asyncio.create_task(p.preset_cycle(**args))
return {"message": "Running light preset: Cycle"}
@app.route("/lights/preset/niagra")
async def lights_preset_niagra(request):
global current_task
cycles = request.args.get("cycles", -1)
current_task = asyncio.create_task(p.preset_niagra(cycles=cycles))
return {"message": "Running light preset: Niagra"}
@app.route("/lights/preset/rainbow")
async def lights_preset_rainbow(request):
global current_task
cycles = request.args.get("cycles", -1)
current_task = asyncio.create_task(p.preset_rainbow(cycles=cycles))
return {"message": "Running light preset: Rainbow"}
def rgb_args(args_in):
args = {}
for key in args_in.keys():
try:
args[key] = int(args_in[key])
except ValueError:
continue
return args
def start_server():
print("Starting Microdot App")
try:
app.run(port=80)
except KeyboardInterrupt:
print("Shutting Down App")
app.shutdown()
gc.collect()
start_server()

+ 2
- 0
requirements.txt View File

@ -0,0 +1,2 @@
esptool
adafruit-ampy

BIN
static/wemos-lolin-d32.fzpz View File


BIN
static/wemos_lolin_d32-diagram.png View File

Before After
Width: 1024  |  Height: 831  |  Size: 253 KiB

BIN
static/wemos_lolin_d32_pro-diagram.jpg View File

Before After
Width: 1600  |  Height: 900  |  Size: 702 KiB

Loading…
Cancel
Save