2010-12-10 python web google openid
Пример OpenID потребителя (клиента) в качестве web-сервера, получение имени пользователя, почтового ящика и пр.
Проверено на: google, yandex, vkontakte
Популярные OpenID провайдеры
Для работы скрипта нужен python-openid
sudo apt-get install python-openid
#!/usr/bin/env python
# coding: utf8
import cgi
import urlparse
import cgitb
import sys
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
SERVER_OPENID = 'https://www.google.com/accounts/o8/id'
#SERVER_OPENID = 'http://openid.yandex.ru/'
#SERVER_OPENID = 'http://vkontakteid.ru/'
# Проверка наличия openid
try:
import openid
except ImportError:
sys.stderr.write("Failed to import the OpenID library")
sys.exit(1)
from openid.consumer import consumer
from openid.extensions import sreg, ax
class OpenIDHTTPServer(HTTPServer):
def __init__(self, host, *args, **kwargs):
HTTPServer.__init__(self, *args, **kwargs)
if self.server_port != 80: self.base_url = ('http://%s:%s/' % (host, self.server_port))
else: self.base_url = 'http://%s/' % host
class OpenIDRequestHandler(BaseHTTPRequestHandler):
# Обработка запросов к серверу
def do_GET(self):
try:
self.parsed_uri = urlparse.urlparse(self.path)
self.query = {}
for k, v in cgi.parse_qsl(self.parsed_uri[4]):
self.query[k] = v.decode('utf-8')
path = self.parsed_uri[2]
if path == '/': self.render()
elif path == '/verify': self.doVerify()
elif path == '/process': self.doProcess()
elif path == '/affiliate': self.doAffiliate()
else: self.render('not found',status=403)
except (KeyboardInterrupt, SystemExit):
raise
except: self.render(cgitb.html(sys.exc_info(), context=10), 500)
# Отрисовка страницы
def render(self, message=None, status=200, ax_response=None, sreg_response=None):
self.send_response(status)
self.end_headers()
self.wfile.write('<html><head><meta http-equiv="content-type" content="text/html;charset=utf-8"/></head><body>')
if message: self.wfile.write(message)
else: self.wfile.write("""<form method="get" accept-charset="UTF-8" action="/verify">
Identifier:<input type="text" name="openid_identifier"
value='""" + SERVER_OPENID + """' />
<input type="submit" value="Verify" /></form>""")
self.wfile.write("</body></html>")
if ax_response: self.renderAX(ax_response)
if sreg_response: self.renderSREG(sreg_response)
# Отрисовка параметров пользователя
def renderAX(self,ax_response):
self.wfile.write('<h1>AX Data</h1>')
for field_name, v in ax_response.data.items():
field_name = field_name.encode('utf8')
value = cgi.escape(v[0].encode('utf8') if v else '')
self.wfile.write('%s = %s<br/>' % (field_name, value))
def renderSREG(self,sreg_response):
self.wfile.write('<h1>SREG Data</h1>')
for field_name, v in sreg_response.data.items():
value = cgi.escape(v.encode('UTF-8') if v else '')
self.wfile.write('%s = %s<br/>' % (field_name, value))
# Отправка запроса на авторизацию
def doVerify(self):
openid_url = self.query['openid_identifier']
if not openid_url:
self.render('Enter an OpenID Identifier to verify.')
return
oidconsumer = consumer.Consumer({}, None)
try:
request = oidconsumer.begin(openid_url)
except consumer.DiscoveryFailure, exc:
fetch_error_string = 'Error in discovery: %s' % ( cgi.escape(str(exc[0])) )
self.render(fetch_error_string)
else:
if request is None:
msg = 'No OpenID services found for <code>%s</code>' % ( cgi.escape(openid_url),)
self.render(msg)
else:
self.requestReg(request)
trust_root = self.server.base_url
return_to = urlparse.urljoin(self.server.base_url, 'process')
print return_to
if request.shouldSendRedirect():
redirect_url = request.redirectURL(trust_root, return_to)
self.send_response(302)
self.send_header('Location', redirect_url)
self.writeUserHeader()
self.end_headers()
else:
form_html = request.htmlMarkup(
trust_root, return_to,
form_tag_attrs={'id':'openid_message'})
self.wfile.write(form_html)
# Настройка параметров от сервера OpenID
def requestReg(self, request):
if request.endpoint.supportsType(ax.AXMessage.ns_uri):
fetch_request = ax.FetchRequest()
for (attr, alias) in [
('http://axschema.org/contact/email', 'email'),
('http://axschema.org/namePerson', 'fullname'),
('http://axschema.org/namePerson/first', 'firstname'),
('http://axschema.org/namePerson/last', 'lastname'),
('http://axschema.org/pref/language', 'language'),
('http://axschema.org/contact/country/home', 'country')]:
fetch_request.add(ax.AttrInfo(attr, alias=alias, required=True))
request.addExtension(fetch_request)
else:
request.addExtension(sreg.SRegRequest(optional=['email', 'fullname', 'nickname'
# for vkontakte
,'gender','country']))
# Обработка ответа от сервера OpenID
def doProcess(self):
oidconsumer = consumer.Consumer({}, None)
url = 'http://'+self.headers.get('Host')+self.path
info = oidconsumer.complete(self.query, url)
ax_response = None
sreg_response = None
display_identifier = info.getDisplayIdentifier()
if info.status == consumer.FAILURE and display_identifier:
fmt = "Verification of %s failed: %s"
message = fmt % (cgi.escape(display_identifier), info.message)
elif info.status == consumer.SUCCESS:
fmt = "You have successfully verified %s as your identity."
message = fmt % (cgi.escape(display_identifier),)
ax_response = ax.FetchResponse.fromSuccessResponse(info)
sreg_response = sreg.SRegResponse.fromSuccessResponse(info)
if info.endpoint.canonicalID:
message += (" This is an i-name, and its persistent ID is %s"
% (cgi.escape(info.endpoint.canonicalID),))
message += '<br/>'
elif info.status == consumer.CANCEL:
message = 'Verification cancelled'
elif info.status == consumer.SETUP_NEEDED:
if info.setup_url: message = '<a href=%s>Setup needed</a>' % ( '"%s"' % cgi.escape(info.setup_url, 1) ,)
else: message = 'Setup needed'
else: message = 'Verification failed.'
self.render(message, ax_response=ax_response, sreg_response=sreg_response)
if __name__ == '__main__':
host = 'localhost'
addr = (host, 8001)
server = OpenIDHTTPServer(host, addr, OpenIDRequestHandler)
print server.base_url
server.serve_forever()