from app import db, login
|
|
from datetime import datetime, timedelta
|
|
from flask import current_app, url_for
|
|
from flask_login import UserMixin
|
|
from time import time
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
import base64
|
|
import json
|
|
import jwt
|
|
import os
|
|
|
|
|
|
class PaginatedAPIMixin(object):
|
|
@staticmethod
|
|
def to_collection_dict(query, page, per_page, endpoint, **kwargs):
|
|
resources = query.paginate(page, per_page, False)
|
|
data = {
|
|
'items': [item.to_dict() for item in resources.items],
|
|
'_meta': {
|
|
'page': page,
|
|
'per_page': per_page,
|
|
'total_pages': resources.pages,
|
|
'total_items': resources.total
|
|
},
|
|
'_links': {
|
|
'self': url_for(endpoint, page=page, per_page=per_page,
|
|
**kwargs),
|
|
'next': url_for(endpoint, page=page, per_page=per_page,
|
|
**kwargs) if resources.has_next else None,
|
|
'prev': url_for(endpoint, page=page, per_page=per_page,
|
|
**kwargs) if resources.has_prev else None,
|
|
}
|
|
}
|
|
return data
|
|
|
|
class User(PaginatedAPIMixin, UserMixin, db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(64), index=True, unique=True)
|
|
email = db.Column(db.String(120), index=True, unique=True)
|
|
password_hash = db.Column(db.String(128))
|
|
last_seen = db.Column(db.DateTime, default=datetime.utcnow)
|
|
role = db.Column(db.String(32), index=True)
|
|
token = db.Column(db.String(32), index=True, unique=True)
|
|
token_expiration = db.Column(db.DateTime)
|
|
|
|
def __repr__(self):
|
|
return f"<User {self.username}>"
|
|
|
|
def set_password(self, password):
|
|
"""
|
|
Set the password_hash field
|
|
|
|
:param password:
|
|
"""
|
|
self.password_hash = generate_password_hash(password)
|
|
|
|
def check_password(self, password):
|
|
"""
|
|
Check the password against the hash in the database
|
|
|
|
:param password:
|
|
returns: True/False
|
|
"""
|
|
return check_password_hash(self.password_hash, password)
|
|
|
|
def from_dict(self, data, new_user=False):
|
|
for field in ['username', 'email']:
|
|
if field in data:
|
|
setattr(self, field, data[field])
|
|
if new_user and 'password' in data:
|
|
self.set_password(data['password'])
|
|
|
|
def to_dict(self, include_email=False):
|
|
data = {
|
|
'id': self.id,
|
|
'username': self.username,
|
|
'last_seen': self.last_seen.isoformat() + 'Z',
|
|
'_links': {
|
|
'self': url_for('api.get_user', id=self.id),
|
|
}
|
|
}
|
|
if include_email:
|
|
data['email'] = self.email
|
|
return data
|
|
|
|
def get_token(self, expires_in=3600):
|
|
now = datetime.utcnow()
|
|
if self.token and self.token_expiration > now + timedelta(seconds=60):
|
|
return self.token
|
|
self.token = base64.b64encode(os.urandom(24)).decode('utf-8')
|
|
self.token_expiration = now + timedelta(seconds=expires_in)
|
|
db.session.add(self)
|
|
return self.token
|
|
|
|
def revoke_token(self):
|
|
self.token_expiration = datetime.utcnow() - timedelta(seconds=1)
|
|
|
|
@staticmethod
|
|
def check_token(token):
|
|
user = User.query.filter_by(token=token).first()
|
|
if user is None or user.token_expiration < datetime.utcnow():
|
|
return None
|
|
return user
|
|
|
|
@staticmethod
|
|
def verify_reset_password_token(token):
|
|
try:
|
|
id = jwt.decode(token, current_app.config['SECRET_KEY'],
|
|
algorithms=['HS256'])['reset_password']
|
|
except:
|
|
return
|
|
return User.query.get(id)
|
|
|
|
@login.user_loader
|
|
def load_user(id):
|
|
# Necessary for flask-login to work
|
|
return User.query.get(int(id))
|