Home

Tags

python openid google

2010-12-10 python web google openid

Пример OpenID потребителя (клиента) в качестве web-сервера, получение имени пользователя, почтового ящика и пр.
Проверено на: google, yandex, vkontakte
Популярные OpenID провайдеры

Для работы скрипта нужен python-openid

sudo apt-get install python-openid
для ubuntu/debian
Запускайте скрипт и открывайте браузер по адресу localhost:8081
(данный код периодический меняю - добавляю поддержку разных порталов)
#!/usr/bin/env python
# coding: utf8

import cgi
import urlparse
import cgitb
import sys

from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler

SERVER_OPENID = 'https://www.google.com/accounts/o8/id'
#SERVER_OPENID = 'http://openid.yandex.ru/'
#SERVER_OPENID = 'http://vkontakteid.ru/'

# Проверка наличия openid
try:
    import openid
except ImportError:
    sys.stderr.write("Failed to import the OpenID library")
    sys.exit(1)

from openid.consumer import consumer
from openid.extensions import sreg, ax

class OpenIDHTTPServer(HTTPServer):
    def __init__(self, host, *args, **kwargs):
        HTTPServer.__init__(self, *args, **kwargs)
        if self.server_port != 80: self.base_url = ('http://%s:%s/' % (host, self.server_port))
        else: self.base_url = 'http://%s/' % host

class OpenIDRequestHandler(BaseHTTPRequestHandler):
    # Обработка запросов к серверу
    def do_GET(self):
        try:
            self.parsed_uri = urlparse.urlparse(self.path)
            self.query = {}
            for k, v in cgi.parse_qsl(self.parsed_uri[4]):
                self.query[k] = v.decode('utf-8')

            path = self.parsed_uri[2]
            if path == '/': self.render()
            elif path == '/verify': self.doVerify()
            elif path == '/process': self.doProcess()
            elif path == '/affiliate': self.doAffiliate()
            else: self.render('not found',status=403)
        except (KeyboardInterrupt, SystemExit):
            raise
        except: self.render(cgitb.html(sys.exc_info(), context=10), 500)
    # Отрисовка страницы
    def render(self, message=None, status=200, ax_response=None, sreg_response=None):
        self.send_response(status)
        self.end_headers()
        self.wfile.write('<html><head><meta http-equiv="content-type" content="text/html;charset=utf-8"/></head><body>')
        if message: self.wfile.write(message)
        else: self.wfile.write("""<form method="get" accept-charset="UTF-8" action="/verify">
            Identifier:<input type="text" name="openid_identifier"
            value='""" + SERVER_OPENID + """' />
            <input type="submit" value="Verify" /></form>""")
        self.wfile.write("</body></html>")
        if ax_response: self.renderAX(ax_response)
        if sreg_response: self.renderSREG(sreg_response)
    # Отрисовка параметров пользователя
    def renderAX(self,ax_response):
        self.wfile.write('<h1>AX Data</h1>')
        for field_name, v in ax_response.data.items():
            field_name = field_name.encode('utf8')
            value = cgi.escape(v[0].encode('utf8') if v else '')
            self.wfile.write('%s = %s<br/>' % (field_name, value))
    def renderSREG(self,sreg_response):
        self.wfile.write('<h1>SREG Data</h1>')
        for field_name, v in sreg_response.data.items():
            value = cgi.escape(v.encode('UTF-8') if v else '')
            self.wfile.write('%s = %s<br/>' % (field_name, value))
    # Отправка запроса на авторизацию
    def doVerify(self):
        openid_url = self.query['openid_identifier']
        if not openid_url:
            self.render('Enter an OpenID Identifier to verify.')
            return

        oidconsumer = consumer.Consumer({}, None)
        try:
            request = oidconsumer.begin(openid_url)
        except consumer.DiscoveryFailure, exc:
            fetch_error_string = 'Error in discovery: %s' % ( cgi.escape(str(exc[0])) )
            self.render(fetch_error_string)
        else:
            if request is None:
                msg = 'No OpenID services found for <code>%s</code>' % ( cgi.escape(openid_url),)
                self.render(msg)
            else:
                self.requestReg(request)

                trust_root = self.server.base_url
                return_to = urlparse.urljoin(self.server.base_url, 'process')
                print return_to
                if request.shouldSendRedirect():
                    redirect_url = request.redirectURL(trust_root, return_to)
                    self.send_response(302)
                    self.send_header('Location', redirect_url)
                    self.writeUserHeader()
                    self.end_headers()
                else:
                    form_html = request.htmlMarkup(
                        trust_root, return_to,
                        form_tag_attrs={'id':'openid_message'})
                    self.wfile.write(form_html)
    # Настройка параметров от сервера OpenID
    def requestReg(self, request):
        if request.endpoint.supportsType(ax.AXMessage.ns_uri):
            fetch_request = ax.FetchRequest()
            for (attr, alias) in [
                ('http://axschema.org/contact/email', 'email'),
                ('http://axschema.org/namePerson', 'fullname'),
                ('http://axschema.org/namePerson/first', 'firstname'),
                ('http://axschema.org/namePerson/last', 'lastname'),
                ('http://axschema.org/pref/language', 'language'),
                ('http://axschema.org/contact/country/home', 'country')]:
                fetch_request.add(ax.AttrInfo(attr, alias=alias, required=True))
            request.addExtension(fetch_request)
        else:
            request.addExtension(sreg.SRegRequest(optional=['email', 'fullname', 'nickname'
                                                            # for vkontakte
                                                            ,'gender','country']))
    # Обработка ответа от сервера OpenID
    def doProcess(self):
        oidconsumer = consumer.Consumer({}, None)

        url = 'http://'+self.headers.get('Host')+self.path
        info = oidconsumer.complete(self.query, url)

        ax_response = None
        sreg_response = None
        display_identifier = info.getDisplayIdentifier()

        if info.status == consumer.FAILURE and display_identifier:
            fmt = "Verification of %s failed: %s"
            message = fmt % (cgi.escape(display_identifier), info.message)
        elif info.status == consumer.SUCCESS:
            fmt = "You have successfully verified %s as your identity."
            message = fmt % (cgi.escape(display_identifier),)
            ax_response = ax.FetchResponse.fromSuccessResponse(info)
            sreg_response = sreg.SRegResponse.fromSuccessResponse(info)

            if info.endpoint.canonicalID:
                message += ("  This is an i-name, and its persistent ID is %s"
                            % (cgi.escape(info.endpoint.canonicalID),))
            message += '<br/>'
        elif info.status == consumer.CANCEL:
            message = 'Verification cancelled'
        elif info.status == consumer.SETUP_NEEDED:
            if info.setup_url: message = '<a href=%s>Setup needed</a>' % ( '"%s"' % cgi.escape(info.setup_url, 1) ,)
            else: message = 'Setup needed'
        else: message = 'Verification failed.'

        self.render(message, ax_response=ax_response, sreg_response=sreg_response)

if __name__ == '__main__':
    host = 'localhost'
    addr = (host, 8001)
    server = OpenIDHTTPServer(host, addr, OpenIDRequestHandler)
    print server.base_url
    server.serve_forever()