Python学习笔记(6)---OAut

发布时间:2019-08-28 09:11:06编辑:auto阅读(1516)

    OAuth: (开放授权)


    wKiom1WEMbKC3Nb5AAIMnP63WMs308.jpg

    OAuth的授权模式:

    • 授权码模式: 功能最完善,流程最严密

    • 简码模式: 不通过第三方应用程序服务器,直接在浏览器中向认证服务器申请指令

    • 密码模式:用户向客户端提供用户名和密码

    • 客户端模式:



    OAuth授权服务器:

    在logindemo.py中添加:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import base64
    import random
    import time
    
    from flask import Flask, request, redirect
    
    app = Flask(__name__)
    
    users = {
        "xxxx": ["xxxxx"]
    }
    
    auth_code = {}
    
    redirect_uri='http://localhost:5000/client/passport'
    # 给用户添加账号
    client_id = 'xxxxxx'
    users[client_id] = []
    # 授权服务器需要保存重定向uri
    oauth_redirect_uri = []
    
    def gen_token(uid):
        token = base64.b64encode(':'.join([str(uid), str(random.random()), str(time.time() + 7200)]))
        users[uid].append(token)
        return token
    
    
    # 生成授权码:
    def gen_auth_code(uri):
        code = random.randint(0, 10000)
        auth_code[code] = uri
        return code
    
    
    def verify_token(token):
        _token = base64.b64decode(token)
        if not users.get(_token.split(':')[0])[-1] == token:
            return -1
        if float(_token.split(':')[-1]) >= time.time():
            return 1
        else:
            return 0
    
    @app.route('/', methods=['GET'])
    def index():
        print request.headers
        return 'hello'
    
    @app.route('/login', methods=['GET'])
    def login():
        uid, pw = base64.b64decode(request.headers['Authorization'].split(' ')[-1]).split(':')
        if users.get(uid)[0] == pw:
            return gen_token(uid)
        else:
            return 'error'
    
    
    #授权码的发放:
    @app.route('/oauth', methods=['GET'])
    def oauth():
        # 验证用户授权
        if request.args.get('user'):
            if users.get(request.args.get('user'))[0] == request.args.get('pw') and oauth_redirect_uri:
                uri = oauth_redirect_uri[0] + '?code=%s' % gen_auth_code(oauth_redirect_uri[0])
                return redirect(uri)
        if request.args.get('code'): # 若请求中携带授权码,
            # 比对uri
            if auth_code.get(int(request.args.get('code'))) == request.args.get('redirect_uri'):
                return gen_token(request.args.get('client_id'))# 发放token
        if request.args.get('redirect_uri'):
            oauth_redirect_uri.append(request.args.get('redirect_uri'))
        return 'please login'
    
    
    # 重定向
    # 用户访问客户端的login目录,客户端将用户重定向到授权服务端的oauth
    @app.route('/client/login', methods=['GET'])
    def client_login():
        uri = 'http://localhost:5000/oauth?response_type=code&client_id=%s&redirect_uri=%s' % (
            client_id, redirect_uri)
        return redirect(uri)
    
    
    @app.route('/client/passport', methods=['POST', 'GET'])
    def client_passport():
        code = request.args.get('code')
        uri = 'http://localhost:5000/oauth?grant_type=authorization_code&code=%s&redirect_uri=%s&client_id=%s' % (code, redirect_uri, client_id)
        return redirect(uri)
    
    @app.route('/test1', methods=['GET'])
    def test():
        token = request.args.get('token')
        if verify_token(token) == 1:
            return 'data'
        else:
            return 'error'
    
    
    
    
    
    if __name__ == '__main__':
        app.run(debug=True)


    在requests_t.py

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import requests
    
    
    
    r = requests.get('http://localhost:5000/client/login')
    print r.text
    print r.history
    print r.url
    uri_login = r.url.split('?')[0] + '?user=zx&pw=thystar'
    r2 = requests.get(uri_login)
    print r2.text
    r = requests.get('http://127.0.0.1:5000/test1', params={'token': r2.text})
    print r.text




    Flask渲染页面集Cookies;

    Cookies的加密方法:

    对源代码的修改:

    logindemo.py

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import base64
    import random
    import time
    import json
    import hmac
    from datetime import datetime, timedelta
    
    from flask import Flask, request, redirect, make_response
    
    app = Flask(__name__)
    
    users = {
        "zx": ["thystar"]
    }
    
    redirect_uri='http://localhost:5000/client/passport'
    client_id = 'thystar'
    users[client_id] = []
    auth_code = {}
    
    oauth_redirect_uri = []
    
    TIMEOUT = 3600 * 2
    
    
    # 新版本的token生成器
    def gen_token(data):
        '''
        :param data: dict type
        :return: base64 str
        '''
        data = data.copy()
        if "salt" not in data:
            data["salt"] = unicode(random.random()).decode("ascii")
        if "expires" not in data:
            data["expires"] = time.time() + TIMEOUT
        payload = json.dumps(data).encode("utf8")
        # 生成签名
        sig = _get_signature(payload)
        return encode_token_bytes(payload + sig)
    
    # 授权码生成器
    def gen_auth_code(uri, user_id):
        code = random.randint(0,10000)
        auth_code[code] = [uri, user_id]
        return code
    
    # 新版本的token验证
    def verify_token(token):
        '''
        :param token: base64 str
        :return: dict type
        '''
        decoded_token = decode_token_bytes(str(token))
        payload = decoded_token[:-16]
        sig = decoded_token[-16:]
        # 生成签名
        expected_sig = _get_signature(payload)
        if sig != expected_sig:
            return {}
        data = json.loads(payload.decode("utf8"))
        if data.get('expires') >= time.time():
            return data
        return 0
    
    # 使用hmac为消息生成签名
    def _get_signature(value):
        """Calculate the HMAC signature for the given value."""
        return hmac.new('secret123456', value).digest()
    
    # 下面两个函数将base64编码和解码单独封装
    def encode_token_bytes(data):
        return base64.urlsafe_b64encode(data)
    
    def decode_token_bytes(data):
        return base64.urlsafe_b64decode(data)
    
    
    # 验证服务器端
    @app.route('/index', methods=['POST', 'GET'])
    def index():
        print request.headers
        return 'hello'
    
    @app.route('/login', methods=['POST', 'GET'])
    def login():
        uid, pw = base64.b64decode(request.headers['Authorization'].split(' ')[-1]).split(':')
        if users.get(uid)[0] == pw:
            return gen_token(dict(user=uid, pw=pw))
        else:
            return 'error'
    
    @app.route('/oauth', methods=['POST', 'GET'])
    def oauth():
        # 处理表单登录, 同时设置Cookie
        if request.method == 'POST' and request.form['user']:
            u = request.form['user']
            p = request.form['pw']
            if users.get(u)[0] == p and oauth_redirect_uri:
                uri = oauth_redirect_uri[0] + '?code=%s' % gen_auth_code(oauth_redirect_uri[0], u)
                expire_date = datetime.now() + timedelta(minutes=1)
                resp = make_response(redirect(uri))
                resp.set_cookie('login', '_'.join([u, p]), expires=expire_date)
                return resp
        # 验证授权码,发放token
        if request.args.get('code'):
            auth_info = auth_code.get(int(request.args.get('code')))
            if auth_info[0] == request.args.get('redirect_uri'):
                # 可以在授权码的auth_code中存储用户名,编进token中
                return gen_token(dict(client_id=request.args.get('client_id'), user_id=auth_info[1]))
        # 如果登录用户有Cookie,则直接验证成功,否则需要填写登录表单
        if request.args.get('redirect_uri'):
            oauth_redirect_uri.append(request.args.get('redirect_uri'))
            if request.cookies.get('login'):
                u, p = request.cookies.get('login').split('_')
                if users.get(u)[0] == p:
                    uri = oauth_redirect_uri[0] + '?code=%s' % gen_auth_code(oauth_redirect_uri[0], u)
                    return redirect(uri)
            return '''
                <form action="" method="post">
                    <p><input type=text name=user>
                    <p><input type=text name=pw>
                    <p><input type=submit value=Login>
                </form>
            '''
    
    
    # 客户端
    @app.route('/client/login', methods=['POST', 'GET'])
    def client_login():
        uri = 'http://localhost:5000/oauth?response_type=code&client_id=%s&redirect_uri=%s' % (client_id, redirect_uri)
        return redirect(uri)
    
    @app.route('/client/passport', methods=['POST', 'GET'])
    def client_passport():
        code = request.args.get('code')
        uri = 'http://localhost:5000/oauth?grant_type=authorization_code&code=%s&redirect_uri=%s&client_id=%s' % (code, redirect_uri, client_id)
        return redirect(uri)
    
    
    # 资源服务器端
    @app.route('/test1', methods=['POST', 'GET'])
    def test():
        token = request.args.get('token')
        ret = verify_token(token)
        if ret:
            return json.dumps(ret)
        else:
            return 'error'
    
    if __name__ == '__main__':
        app.run(debug=True)



    运行http://localhost:5000/client/login

    登陆得到token,把token拼接到test1中测试









    极客学院:http://www.jikexueyuan.com/course/695.html


关键字