from app import db, login from datetime import datetime, timedelta from flask import current_app, url_for from flask_login import UserMixin from time import time from werkzeug.security import generate_password_hash, check_password_hash import base64 import json import jwt import os class PaginatedAPIMixin(object): @staticmethod def to_collection_dict(query, page, per_page, endpoint, **kwargs): resources = query.paginate(page, per_page, False) data = { 'items': [item.to_dict() for item in resources.items], '_meta': { 'page': page, 'per_page': per_page, 'total_pages': resources.pages, 'total_items': resources.total }, '_links': { 'self': url_for(endpoint, page=page, per_page=per_page, **kwargs), 'next': url_for(endpoint, page=page, per_page=per_page, **kwargs) if resources.has_next else None, 'prev': url_for(endpoint, page=page, per_page=per_page, **kwargs) if resources.has_prev else None, } } return data class User(PaginatedAPIMixin, UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(64), index=True, unique=True) email = db.Column(db.String(120), index=True, unique=True) password_hash = db.Column(db.String(128)) last_seen = db.Column(db.DateTime, default=datetime.utcnow) role = db.Column(db.String(32), index=True) token = db.Column(db.String(32), index=True, unique=True) token_expiration = db.Column(db.DateTime) def __repr__(self): return f"" def set_password(self, password): """ Set the password_hash field :param password: """ self.password_hash = generate_password_hash(password) def check_password(self, password): """ Check the password against the hash in the database :param password: returns: True/False """ return check_password_hash(self.password_hash, password) def from_dict(self, data, new_user=False): for field in ['username', 'email']: if field in data: setattr(self, field, data[field]) if new_user and 'password' in data: self.set_password(data['password']) def to_dict(self, include_email=False): data = { 'id': self.id, 'username': self.username, 'last_seen': self.last_seen.isoformat() + 'Z', '_links': { 'self': url_for('api.get_user', id=self.id), } } if include_email: data['email'] = self.email return data def get_token(self, expires_in=3600): now = datetime.utcnow() if self.token and self.token_expiration > now + timedelta(seconds=60): return self.token self.token = base64.b64encode(os.urandom(24)).decode('utf-8') self.token_expiration = now + timedelta(seconds=expires_in) db.session.add(self) return self.token def revoke_token(self): self.token_expiration = datetime.utcnow() - timedelta(seconds=1) @staticmethod def check_token(token): user = User.query.filter_by(token=token).first() if user is None or user.token_expiration < datetime.utcnow(): return None return user @staticmethod def verify_reset_password_token(token): try: id = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])['reset_password'] except: return return User.query.get(id) @login.user_loader def load_user(id): # Necessary for flask-login to work return User.query.get(int(id))