Home > OS >  Flask: JWT authentication not working while login
Flask: JWT authentication not working while login

Time:10-21

I've created a basic flask application in which I want to implement Login through JWT. It generates tokens for every USERNAME and PASSWORD, whether the entered value is right or wrong.

Plus, I'm unsure how it is to 'validate the data' at the time of login and 'check the password for a hash'.

I've seen many libraries for JWT in Flask I'm confused about which one to use as I'm using SQLAlchemy core(writing hardcoded SQL queries for API request).

app.py


from dataclasses import fields
import datetime
import json
from typing_extensions import Required
from sqlalchemy import DateTime
from email.policy import default, strict
from flask import Flask, jsonify, request, make_response
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from sqlalchemy.sql import text, func
from flask_marshmallow import Marshmallow
from marshmallow import Schema, fields, validates, ValidationError, validate
from sqlalchemy.sql import select, insert, bindparam
from sqlalchemy.dialects.postgresql import UUID
import uuid
from werkzeug.security import generate_password_hash,check_password_hash
from sqlalchemy import create_engine
import logging
from flask_jwt_extended import JWTManager, jwt_required, create_access_token
from functools import wraps
import jwt

engine = create_engine('mysql://root:[email protected]:3306/flask')


app = Flask(__name__)

app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:[email protected]:3306/flask'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = 'thisissecret'
#app.config['JWT_SECRET_KEY'] = 'super-secret' 
db = SQLAlchemy(app)
ma= Marshmallow(app)
migrate = Migrate(app, db)
# jwt = JWTManager(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    user_name = db.Column(db.String(100), nullable=False)
    password = db.Column(db.String(100), nullable=False)
    email_address = db.Column(db.String(80), unique=True, nullable=False)
    dob = db.Column(db.DateTime, nullable=True)
    address = db.Column(db.String(200))
    uid = db.Column(db.Text(length=16), nullable = False)

    def __init__(self, user_name, password, email_address, dob, address, uid):
        self.user_name = user_name
        self.password = password
        self.email_address = email_address
        self.dob = dob
        self.address = address
        self.uid = uid

# User Schema
class UserSchema(Schema):
    print("Scehma")

    user_name = fields.String(required=True, validate=validate.Length(min=5))
    password = fields.String()
    email_address = fields.String(required=True, error_messages={"required": " email_address is required. "})
    dob = fields.DateTime(required= True, error_messages={"required": " dob is required. "})
    address = fields.String()
    uid = fields.UUID( error_messages={'null': 'Sorry, Id field cannot be null', 'invalid_uuid': 'Sorry, Id field must be a valid UUID'})

    print("access")

    @validates('dob')
    def is_not_in_future(self,value):
        """'value' is the datetime parsed from time_created by marshmallow"""
        now = datetime.datetime.now()
        if value > now:
            raise ValidationError("Can't create notes in the future!")
        # if the function doesn't raise an error, the check is considered passed

    class Meta:
        # Fields to show when sending data
        fields = ('id','user_name', 'password', 'email_address', 'dob', 'address')   


#init Schema

user_schema = UserSchema()
users_scehma = UserSchema(many=True)





@app.route('/users', methods=['GET'])
# def users():
#     if request.method == 'GET':
#         all_users = User.query.all()
#         return users_scehma.dump(all_users)

def get_all_users():
    conn = engine.connect()
    str_sql = text("SELECT * FROM flask.`user`;")
    results = conn.execute(str_sql).fetchall()
    return users_scehma.dump(results)

@app.route('/register', methods=['POST'])

def user_register():
    print("entry")
    # request_data = request.get_json()
    # print("data :",request_data)
    errors = user_schema.validate(request.get_json())
    print("errors", errors)

    data_dict= user_schema.load(request.get_json())
    print("dict:",data_dict)

    conn = engine.connect()

    user_name =  data_dict['user_name']
    print(user_name)
    password = generate_password_hash(data_dict['password'], method = 'sha256')
    email_address = data_dict['email_address']
    dob = data_dict['dob']
    address = data_dict['address']
    uid = uuid.uuid4().hex
   
    print(uid)
    # print (user_name,password,email_address,dob,address)

    sql= text("""INSERT INTO user (user_name, password, email_address, dob, address, uid ) VALUES(:user_name , :password, :email_address, :dob, :address, :uid)""")
    data = ({"user_name": user_name, "password": password, "email_address": email_address, "dob": dob, "address": address, "uid": uid})
    print(sql)
    conn.execute(sql, data)
    return jsonify({'Message':'New user Created'})


@app.route('/login', methods=['POST'])
def login():
    print("login") 
    # request_data= user_schema.load(request.get_json()) 
    request_data = request.get_json()
    user_name=request_data['user_name']
    password=request_data['password']
    conn = engine.connect()
    data=conn.execute("""SELECT user_name, password FROM user WHERE user_name = user_name and password = password""",{"user_name": user_name, "password": password}).fetchone()
    # userdata = json.dumps(data.data)
    print(data)


     if data:
         access_token = create_access_token(identity=user_name)
         return jsonify(message='Login Successful', access_token=access_token)
     else:
         return jsonify('Bad email or Password'), 401

if __name__ == '__main__':
    app.run(debug=True)

CodePudding user response:

To handle the authentication process correctly you should define 2 methods before declaring the app = Flask(__name__) which are called authenticate and identity in JWT(app, authenticate, identity) `

from flask_jwt import JWT

def authenticate(username, password):
    user = username_table.get(username, None)
    if user and safe_str_cmp(user.password.encode('utf-8'), password.encode('utf-8')):
        return user

def identity(payload):
    user_id = payload['identity']
    return userid_table.get(user_id, None)

engine = create_engine('mysql://root:[email protected]:3306/flask')

app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'super-secret'

jwt = JWT(app, authenticate, identity)

now you just have to enter the user data and make a request to your program to return the user data and secret token, additionally also specify the JWT payload handler that recieve data from identity method.

@jwt.payload_handler
def make_payload(identity):
    return {'user_id': identity.id}

Also check authentication_handler

@jwt.authentication_handler
def authenticate(username, password):
    user = User.query.filter(User.username == username).scalar()
    if bcrypt.check_password_hash(user.password, password):
        return user
  • Related