【Python开发】Flask中的单点登录解决方案

您所在的位置:网站首页 flaskimporterror 【Python开发】Flask中的单点登录解决方案

【Python开发】Flask中的单点登录解决方案

2023-01-03 06:44| 来源: 网络整理| 查看: 265

Flask中的单点登录解决方案 1.SSO 和 CAS

单点登录(Single Sign On,SSO)就是通过用户的一次性鉴别登录。当用户在身份认证服务器上登录一次以后,即可获得访问单点登录系统中其他关联系统和应用软件的权限,同时这种实现是不需要管理员对用户的登录状态或其他信息进行修改的,这意味着在多个应用系统中,用户只需一次登录就可以访问所有相互信任的应用系统。这种方式减少了由登录产生的时间消耗,辅助了用户管理,是比较流行的。

中央认证服务(Central Authentication Service,CAS),一种独立开放指令协议,是耶鲁大学发起的一个企业级开源项目,旨在为 Web 应用系统提供一种可靠的 SSO 解决方案。

在这里插入图片描述

在这里插入图片描述

2.Flask-CAS

那么对于 Flask,我们如何实现单点登录呢?此处提供一个开源的解决方案:Flask-CAS。在实际开发中,直接使用它的 API 可能无法满足我们的需求,这个时候就需要对源码进行一些修改。目前最新的版本是 1.0.2 版本。

在这里插入图片描述

__init__.py:主要是一些配置和基本的函数。 cas_urls.py:主要是生成一些 CAS 登录相关的 url 以及跳转的 url。 routing.py:项目的核心所在,主要包含了 登录、登出、验证 三个函数,这三个函数也是我们自定义时候重点修改的地方。

""" flask_cas.__init__ """ import flask from flask import current_app # Find the stack on which we want to store the database connection. # Starting with Flask 0.9, the _app_ctx_stack is the correct one, # before that we need to use the _request_ctx_stack. try: from flask import _app_ctx_stack as stack except ImportError: from flask import _request_ctx_stack as stack from . import routing from functools import wraps class CAS(object): """ Required Configs: |Key | |----------------| |CAS_SERVER | |CAS_AFTER_LOGIN | Optional Configs: |Key | Default | |---------------------------|-----------------------| |CAS_TOKEN_SESSION_KEY | _CAS_TOKEN | |CAS_USERNAME_SESSION_KEY | CAS_USERNAME | |CAS_ATTRIBUTES_SESSION_KEY | CAS_ATTRIBUTES | |CAS_LOGIN_ROUTE | '/cas' | |CAS_LOGOUT_ROUTE | '/cas/logout' | |CAS_VALIDATE_ROUTE | '/cas/serviceValidate'| |CAS_AFTER_LOGOUT | None | """ def __init__(self, app=None, url_prefix=None): self._app = app if app is not None: self.init_app(app, url_prefix) def init_app(self, app, url_prefix=None): # Configuration defaults app.config.setdefault('CAS_TOKEN_SESSION_KEY', '_CAS_TOKEN') app.config.setdefault('CAS_USERNAME_SESSION_KEY', 'CAS_USERNAME') app.config.setdefault('CAS_ATTRIBUTES_SESSION_KEY', 'CAS_ATTRIBUTES') app.config.setdefault('CAS_LOGIN_ROUTE', '/cas') app.config.setdefault('CAS_LOGOUT_ROUTE', '/cas/logout') app.config.setdefault('CAS_VALIDATE_ROUTE', '/cas/serviceValidate') # Requires CAS 2.0 app.config.setdefault('CAS_AFTER_LOGOUT', None) # Register Blueprint app.register_blueprint(routing.blueprint, url_prefix=url_prefix) # Use the newstyle teardown_appcontext if it's available, # otherwise fall back to the request context if hasattr(app, 'teardown_appcontext'): app.teardown_appcontext(self.teardown) else: app.teardown_request(self.teardown) def teardown(self, exception): ctx = stack.top @property def app(self): return self._app or current_app @property def username(self): return flask.session.get( self.app.config['CAS_USERNAME_SESSION_KEY'], None) @property def attributes(self): return flask.session.get( self.app.config['CAS_ATTRIBUTES_SESSION_KEY'], None) @property def token(self): return flask.session.get( self.app.config['CAS_TOKEN_SESSION_KEY'], None) def login(): return flask.redirect(flask.url_for('cas.login', _external=True)) def logout(): return flask.redirect(flask.url_for('cas.logout', _external=True)) def login_required(function): @wraps(function) def wrap(*args, **kwargs): if 'CAS_USERNAME' not in flask.session: flask.session['CAS_AFTER_LOGIN_SESSION_URL'] = ( flask.request.script_root + flask.request.full_path ) return login() else: return function(*args, **kwargs) return wrap """ flask_cas.cas_urls Functions for creating urls to access CAS. """ try: from urllib import quote from urllib import urlencode from urlparse import urljoin except ImportError: from urllib.parse import quote from urllib.parse import urljoin from urllib.parse import urlencode def create_url(base, path=None, *query): """ Create a url. Creates a url by combining base, path, and the query's list of key/value pairs. Escaping is handled automatically. Any key/value pair with a value that is None is ignored. Keyword arguments: base -- The left most part of the url (ex. http://localhost:5000). path -- The path after the base (ex. /foo/bar). query -- A list of key value pairs (ex. [('key', 'value')]). Example usage: >>> create_url( ... 'http://localhost:5000', ... 'foo/bar', ... ('key1', 'value'), ... ('key2', None), # Will not include None ... ('url', 'http://example.com'), ... ) 'http://localhost:5000/foo/bar?key1=value&url=http%3A%2F%2Fexample.com' """ url = base # Add the path to the url if it's not None. if path is not None: url = urljoin(url, quote(path)) # Remove key/value pairs with None values. query = filter(lambda pair: pair[1] is not None, query) # Add the query string to the url url = urljoin(url, '?{0}'.format(urlencode(list(query)))) return url def create_cas_login_url(cas_url, cas_route, service, renew=None, gateway=None): """ Create a CAS login URL . Keyword arguments: cas_url -- The url to the CAS (ex. http://sso.pdx.edu) cas_route -- The route where the CAS lives on server (ex. /cas) service -- (ex. http://localhost:5000/login) renew -- "true" or "false" gateway -- "true" or "false" Example usage: >>> create_cas_login_url( ... 'http://sso.pdx.edu', ... '/cas', ... 'http://localhost:5000', ... ) 'http://sso.pdx.edu/cas?service=http%3A%2F%2Flocalhost%3A5000' """ return create_url( cas_url, cas_route, ('service', service), ('renew', renew), ('gateway', gateway), ) def create_cas_logout_url(cas_url, cas_route, service=None): """ Create a CAS logout URL. Keyword arguments: cas_url -- The url to the CAS (ex. http://sso.pdx.edu) cas_route -- The route where the CAS lives on server (ex. /cas/logout) url -- (ex. http://localhost:5000/login) Example usage: >>> create_cas_logout_url( ... 'http://sso.pdx.edu', ... '/cas/logout', ... 'http://localhost:5000', ... ) 'http://sso.pdx.edu/cas/logout?service=http%3A%2F%2Flocalhost%3A5000' """ return create_url( cas_url, cas_route, ('service', service), ) def create_cas_validate_url(cas_url, cas_route, service, ticket, renew=None): """ Create a CAS validate URL. Keyword arguments: cas_url -- The url to the CAS (ex. http://sso.pdx.edu) cas_route -- The route where the CAS lives on server (ex. /cas/serviceValidate) service -- (ex. http://localhost:5000/login) ticket -- (ex. 'ST-58274-x839euFek492ou832Eena7ee-cas') renew -- "true" or "false" Example usage: >>> create_cas_validate_url( ... 'http://sso.pdx.edu', ... '/cas/serviceValidate', ... 'http://localhost:5000/login', ... 'ST-58274-x839euFek492ou832Eena7ee-cas' ... ) 'http://sso.pdx.edu/cas/serviceValidate?service=http%3A%2F%2Flocalhost%3A5000%2Flogin&ticket=ST-58274-x839euFek492ou832Eena7ee-cas' """ return create_url( cas_url, cas_route, ('service', service), ('ticket', ticket), ('renew', renew), ) """ flask_cas.routing """ import flask from xmltodict import parse from flask import current_app from .cas_urls import create_cas_login_url from .cas_urls import create_cas_logout_url from .cas_urls import create_cas_validate_url try: from urllib import urlopen except ImportError: from urllib.request import urlopen blueprint = flask.Blueprint('cas', __name__) @blueprint.route('/login/') def login(): """ This route has two purposes. First, it is used by the user to login. Second, it is used by the CAS to respond with the `ticket` after the user logs in successfully. When the user accesses this url, they are redirected to the CAS to login. If the login was successful, the CAS will respond to this route with the ticket in the url. The ticket is then validated. If validation was successful the logged in username is saved in the user's session under the key `CAS_USERNAME_SESSION_KEY` and the user's attributes are saved under the key 'CAS_USERNAME_ATTRIBUTE_KEY' """ cas_token_session_key = current_app.config['CAS_TOKEN_SESSION_KEY'] redirect_url = create_cas_login_url( current_app.config['CAS_SERVER'], current_app.config['CAS_LOGIN_ROUTE'], flask.url_for('.login', origin=flask.session.get('CAS_AFTER_LOGIN_SESSION_URL'), _external=True)) if 'ticket' in flask.request.args: flask.session[cas_token_session_key] = flask.request.args['ticket'] if cas_token_session_key in flask.session: if validate(flask.session[cas_token_session_key]): if 'CAS_AFTER_LOGIN_SESSION_URL' in flask.session: redirect_url = flask.session.pop('CAS_AFTER_LOGIN_SESSION_URL') elif flask.request.args.get('origin'): redirect_url = flask.request.args['origin'] else: redirect_url = flask.url_for( current_app.config['CAS_AFTER_LOGIN']) else: del flask.session[cas_token_session_key] current_app.logger.debug('Redirecting to: {0}'.format(redirect_url)) return flask.redirect(redirect_url) @blueprint.route('/logout/') def logout(): """ When the user accesses this route they are logged out. """ cas_username_session_key = current_app.config['CAS_USERNAME_SESSION_KEY'] cas_attributes_session_key = current_app.config['CAS_ATTRIBUTES_SESSION_KEY'] if cas_username_session_key in flask.session: del flask.session[cas_username_session_key] if cas_attributes_session_key in flask.session: del flask.session[cas_attributes_session_key] if(current_app.config['CAS_AFTER_LOGOUT'] is not None): redirect_url = create_cas_logout_url( current_app.config['CAS_SERVER'], current_app.config['CAS_LOGOUT_ROUTE'], current_app.config['CAS_AFTER_LOGOUT']) else: redirect_url = create_cas_logout_url( current_app.config['CAS_SERVER'], current_app.config['CAS_LOGOUT_ROUTE']) current_app.logger.debug('Redirecting to: {0}'.format(redirect_url)) return flask.redirect(redirect_url) def validate(ticket): """ Will attempt to validate the ticket. If validation fails, then False is returned. If validation is successful, then True is returned and the validated username is saved in the session under the key `CAS_USERNAME_SESSION_KEY` while tha validated attributes dictionary is saved under the key 'CAS_ATTRIBUTES_SESSION_KEY'. """ cas_username_session_key = current_app.config['CAS_USERNAME_SESSION_KEY'] cas_attributes_session_key = current_app.config['CAS_ATTRIBUTES_SESSION_KEY'] current_app.logger.debug("validating token {0}".format(ticket)) cas_validate_url = create_cas_validate_url( current_app.config['CAS_SERVER'], current_app.config['CAS_VALIDATE_ROUTE'], flask.url_for('.login', origin=flask.session.get('CAS_AFTER_LOGIN_SESSION_URL'), _external=True), ticket) current_app.logger.debug("Making GET request to {0}".format( cas_validate_url)) xml_from_dict = {} isValid = False try: xmldump = urlopen(cas_validate_url).read().strip().decode('utf8', 'ignore') xml_from_dict = parse(xmldump) isValid = True if "cas:authenticationSuccess" in xml_from_dict["cas:serviceResponse"] else False except ValueError: current_app.logger.error("CAS returned unexpected result") if isValid: current_app.logger.debug("valid") xml_from_dict = xml_from_dict["cas:serviceResponse"]["cas:authenticationSuccess"] username = xml_from_dict["cas:user"] flask.session[cas_username_session_key] = username if "cas:attributes" in xml_from_dict: attributes = xml_from_dict["cas:attributes"] if "cas:memberOf" in attributes: if not isinstance(attributes["cas:memberOf"], list): attributes["cas:memberOf"] = attributes["cas:memberOf"].lstrip('[').rstrip(']').split(',') for group_number in range(0, len(attributes['cas:memberOf'])): attributes['cas:memberOf'][group_number] = attributes['cas:memberOf'][group_number].lstrip(' ').rstrip(' ') else: for index in range(0, len(attributes['cas:memberOf'])): attributes["cas:memberOf"][index] = attributes["cas:memberOf"][index].lstrip('[').rstrip(']').split(',') for group_number in range(0, len(attributes['cas:memberOf'][index])): attributes['cas:memberOf'][index][group_number] = attributes['cas:memberOf'][index][group_number].lstrip(' ').rstrip(' ') flask.session[cas_attributes_session_key] = attributes else: current_app.logger.debug("invalid") return isValid 3.Example

首先需要导入包。

from flask_cas import CAS

使用方法一:

app = Flask(__name__) CAS(app)

使用方法二:

cas = CAS() app = Flask(__name__) cas.init_app(app)

CAS 类将添加两条路由 /login/ 和 /logout/。也可以把一个路由前缀作为第二个参数传给 CAS 的构造方法或初始化方法。

/login/ 路由会将用户重定向到 CAS_SERVERE 配置值指定的 CAS。如果登录成功,用户将被重定向到由 CAS_AFTER_LOGIN 配置值指定的端点,登录用户的用户名也将被存储到由 CAS_USERNAME_SESSION_KEY 指定的键的 session 中。如果属性可用,它们将被存储在 CAS_ATTRIBUTES_SESSION_KEY 指定的键的 session 中。

/logout/ 路由会将用户重定向到 CAS 注销页面,用户名和属性将会从 session 中被删除。

下面给出一个实例:

import flask from flask import Flask from flask_cas import CAS from flask_cas import login_required app = Flask(__name__) cas = CAS(app) # 此处填写server端地址 app.config['CAS_SERVER'] = 'https://server.cas.com:8443/cas' # 填写cas登录后跳转的路由 app.config['CAS_AFTER_LOGIN'] = 'login' app.config['SECRET_KEY'] ='dasdasdasdasdada123123423423aASDAS' # 全局取消证书验证 import ssl ssl._create_default_https_context = ssl._create_unverified_context @app.route('/') @login_required def login(): return flask.render_template( 'demo.html', username=cas.username, ) if __name__ == '__main__': app.run("app1.cas.com", 9100, debug=True) # 此处填写客户端路由

demo.html

DOCTYPE html> Title {{ username }}


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3