""" open/DurusWorks/qp/pub/publish.py """ from base64 import b64decode from durus.btree import BTree from durus.connection import Connection from durus.error import ConflictError from durus.file_storage import TempFileStorage, FileStorage from durus.storage_server import StorageServer from durus.utils import as_bytes from os import getpid from pprint import pformat from qp.fill.directory import Directory from qp.fill.form import Form from qp.hub.web import run_web from qp.lib.site import Site from qp.lib.spec import spec, add_getters, specify, match from qp.lib.tz import UTC from qp.lib.util import randbytes, urljoin, as_str from qp.mail.send import Email from qp.pub.common import get_hit, set_publisher, get_request, get_response from qp.pub.common import get_user, get_session, site_now from qp.pub.hit import Hit from qp.pub.session import Session from qp.pub.user import User, compute_digest from qpy import xml from socket import getfqdn from sys import exc_info from traceback import format_exception import binascii import qp.lib.site import sys class RespondNow (Exception): """ This Exception is used to break out of the path traversal. The Publisher expects the contents of the response to be set before this exception is raised. """ class Publisher (object): """ The basic QP Publisher class. This is implements the normal traversal, page generation, and error handling pattern used in QP. Subclasses may override fill_response() or other methods to customize this as desired. """ site_is = spec( Site, "the QP Site") root_directory_is = spec( Directory, "The root directory") hit_is = spec( Hit, "The Hit (Request, Response, Session, etc.) currently being " "processed, or None.") test_site_name = "proto" def __init__(self, site=None, hub=False, **other): set_publisher(self) self.set_hit(None) specify(self, site=site or Site(self.test_site_name)) self.root_directory = None self.hub = hub def run_web(site): """ Launch the web server(s) for this site. The default implementation uses the qp web server to provide http and scgi service if addresses for those services are present in self.configuration. """ run_web(site) run_web = staticmethod(run_web) def __call__(self, wsgi_env, wsgi_start_response): """(wsgi_env:dict, wsgi_start_response:callable) This makes it possible to use a Publisher instance as a wsgi application. """ input = wsgi_env[as_bytes('wsgi.input')] hit = self.process(input, wsgi_env) response = hit.get_response() status = as_bytes("%03d %s" % response.get_status()) headers = [(as_bytes(k), as_bytes(v)) for k,v in response.generate_headers()] wsgi_start_response(status, headers) return response.generate_body_chunks() def process(self, stdin, env): """ Return a Hit with a complete response. Subclasses that want to customize the Hit class may do so by overriding this method. """ if env.get('SCRIPT_NAME') and env.get('PATH_INFO') == '': # The "script" here is your QP application, so the SCRIPT_NAME # should be the part of the url, after the host name, that # causes requests to be handled by your QP application. # The PATH_INFO part of the path after the SCRIPT_NAME. # We expect applications running at the top of the URL space # to get requests where the SCRIPT_NAME is the empty string # and the PATH_INFO starts with a '/'. # Your HTTP server is not providing the expected CGI environment. # The full path part of the URL is being put in the SCRIPT_NAME. # We'll try to make it work anyway, but we are assuming here # that your application is running at the top of your URL space. # If you want your application to run somewhere else in your URL # space, you must change your server so that it provides the # correct environment variables. Alternatively, your publisher # class can override this method and hack the environment as # needed to fit QP's expectations. env['PATH_INFO'] = env['SCRIPT_NAME'] env['SCRIPT_NAME'] = "" hit = Hit(stdin, env) self.process_hit(hit) return hit def process_hit(self, hit): """(hit:Hit) This processes the request on the hit and sets the response. Subclasses that want to customize error handling may do so by overriding this method. """ self.set_hit(hit) try: hit.init_response() self.fill_response() except RespondNow: pass except: self.handle_exception() self.log_hit() def get_fields(self): try: return self.get_hit().get_request().get_fields() except (ValueError, UnicodeDecodeError, IOError): print(sys.exc_info()[1]) self.log_hit() self.respond("Bad Request", "We could not process your request.", status=400) def fill_response(self): """ Set the response. Subclasses that want to use some other template system may do so by overriding this method. """ self.fill_response_using_root_directory() def set_hit(self, hit): """(hit:Hit) Set the current Hit. """ self.hit = hit def respond_now(self): """ This breaks out of path traversal. When this is called, the response should have already been prepared for sending. """ raise RespondNow def handle_exception(self): """ This is called when there is an error exception while processing a request. """ self.log_exception() if exc_info()[0] is SystemExit: raise self.get_hit().init_response() self.get_hit().get_response().set_status(500) if self.display_exceptions(): self.display_exception() else: self.hide_exception() def secure(self): """If the scheme is not https, redirect so that it will be. """ if (get_request().get_scheme() != 'https' and self.get_site().get_https_address()): self.redirect(self.complete_url('', secure=True)) def complete_path(self, path): """(path:str) -> str Turn path into a complete path, prepending the script_name if path starts with a slash. """ s = str(path) if s.startswith('/'): return get_request().get_script_name() + s else: return s def complete_url(self, path, secure=False): """(path:str, secure:bool=False) -> str Turn path into a complete url to this publisher, changing the scheme to https if secure is True. """ s = str(path) if self.get_hit() is None: return s if not secure and s.startswith('http://'): return s if s.startswith('https://'): return s if secure: host, port = self.get_site().get_https_address()[:2] if not host: host = get_request().get_server().split(':')[0] if port == 443: address = str(host) else: address = "%s:%s" % (host, port) base = 'https://%s%s' % (address, get_request().get_path_query()) else: base = get_request().get_url() return urljoin(base, self.complete_path(s)) def redirect(self, location, permanent=False): """(location:str, permanent:boolean=False) This prepares a redirect response and uses an exception to break out of the path traversal and return the response immediately. """ if permanent: status = 301 else: status = 302 response = self.get_hit().get_response() response.set_status(status) response.set_header('location', self.complete_url(location)) response.set_content_type('text/plain', 'iso-8859-1') response.set_body( "Your browser should have redirected you to %s" % location) self.respond_now() def not_found(self, body=None): """(body:str) This prepares a 404 response and uses an exception to break out of the path traversal and return the response immediately. """ self.respond('Not Found', body or 'That page is not here.', status=404) def is_live_host(self): return self.get_site().get_live_host_name() == getfqdn() def is_staging_host(self): return self.get_site().get_staging_host_name() == getfqdn() def is_hub(self): """() -> boolean Was this Publisher created as part of the qp.hub scgi or http server? """ return self.hub def log_exception(self): """ This is called to record an error exception. """ report = self.format_exception_report() print(report) if self.is_live_host(): self.send_to_administrator(report) def display_exception(self): """ This places an exception report in the response. """ get_hit().get_response().set_body(self.format_exception_report()) get_hit().get_response().set_content_type('text/plain', 'iso-8859-1') def format_user_info_for_log(self): return '- -' def format_log_hit_line(self): """() -> str """ hit = get_hit() code = hit.get_response().get_status_code() length = hit.get_response().get_content_length() or 0 mime_type = hit.get_response().get_mime_type() or '-' charset = (hit.get_response().get_charset() or '-').replace(' ', '_') duration = str(site_now() - hit.get_time()).lstrip('0:') request = hit.get_request() path_query = request.get_path_query() method = request.get_method() remote = request.get_remote_address() scheme = request.get_scheme() try: user = self.format_user_info_for_log() except: user = "! !" print("[!%s in format_user_info_for_log()]" % exc_info()[0]) referrer = request.get_header('http-referer', '-').replace(' ', '_') agent = request.get_header('user-agent', '-').replace(' ', '_') time = hit.get_time().strftime("%Y-%m-%d %H:%M:%S") pid = getpid() msg = ('%(time)s %(code)s %(length)s %(mime_type)s %(charset)s %(duration)s %(remote)s %(user)s %(pid)s ' '%(scheme)s %(method)s %(path_query)s %(referrer)s %(agent)s' % locals()) return msg def log_hit(self): """ This logs the processing of a request. """ print(self.format_log_hit_line()) def format_exception_report(self): """() -> str This returns a string that reports an error exception. """ hit = get_hit() request = hit.get_request() report = ''.join(format_exception(*exc_info())) report += '\npath = %r' % request.get_path() report += '\nquery= %r' % request.get_query() report += '\n\ninfo:\n' + pformat(hit.get_info()) report += '\n\nvars(request):\n' + pformat(vars(request)) report += '\n' return report def hide_exception(self): """ This sets the response to be a page that can be shown when there is an error exception, but you don't want to expose the details of the exception. """ get_hit().get_response().set_status(500) get_hit().get_response().set_body( self.page('Regrets', xml("

This page is temporarily unavailable.

" "

Please try again later.

"))) def get_root_directory(self): if self.root_directory is None: self.root_directory = self.site.get_root_directory_class()() return self.root_directory def fill_response_using_root_directory(self): """ Traverse the components, set the response body, using the normal _q_traverse() method. """ path = get_request().get_path_info() components = path[1:].split('/') body = self.get_root_directory()._q_traverse(components) get_response().set_body(body) def display_exceptions(self): """() -> bool Should the details of error exceptions be reported in responses? """ return not self.is_live_host() def respond(self, title, *content, **kwargs): """(title:str, *content, **kwargs) Fill the response using the page() method and the given title, content, and **kwargs and return immediately. """ self.get_hit().init_response() if 'status' in kwargs: status = kwargs['status'] del kwargs['status'] else: status = 400 self.get_hit().get_response().set_status(status) self.get_hit().get_response().set_body( self.page(title, *content, **kwargs)) self.respond_now() def respond_plain(self, body): """(body:str) Respond immediately with the given text/plain body. """ self.get_hit().get_response().set_status(200) self.get_hit().get_response().set_content_type('text/plain', 'utf-8') self.get_hit().get_response().set_body(body) self.respond_now() # Email-related methods. def get_webmaster_address(self): """() -> email_address:str """ return 'webmaster@%s' % getfqdn() def get_debug_address(self): """() -> email_address:str|None If this returns a nonempty string, the qp.mail.send.Email class will use this as the smtp recipient for all messages it sends. If you want email sent by that class to go to the normal recipients, you must override this to return None. """ if self.is_live_host(): return None else: return self.get_webmaster_address() def is_email_enabled(self): """() -> bool The qp.mail.send.Email class will never send any email unless this returns true. If you want any email sent (using that class), you must override this to return True. """ return False def get_smtp_server(self): """() -> str """ return 'localhost' def send_to_administrator(self, message): """(message:str) """ if self.is_email_enabled(): email = Email() email.set_subject(self.get_site().get_name()) email.set_body(message) if email.send(): print('\nemail:sent') # Page formatting methods. def header(self, title, doctype='xhtml1-strict', style=None, **kwargs): """(title, **kwargs) -> xml Return the site-standard html header. """ if doctype == 'xhtml1-transitional': s = xml( '') elif doctype == 'xhtml1-strict': s = xml( '') else: s = xml() s += xml('') s += xml('') if style is not None: s += xml('') % style s += xml('%s') % title s += xml('') % self.get_agent_class() return s def footer(self, *args, **kwargs): """(**kwargs) -> xml Return the site-standard html footer. """ return xml('') def page(self, title, *content, **kwargs): """(title, *content, **kwargs) -> xml Return a page formatted according to the site-standard. """ return (self.header(title, **kwargs) + xml("").join(content) + self.footer(title=title, **kwargs)) def get_time_zone(self): return UTC def format_date_time(self, date): """(date : datetime) -> str """ if date: tz = self.get_time_zone() if tz is not date.tzinfo: date = date.astimezone(tz) return date.strftime('%Y-%m-%d %H:%M %Z') else: return '' def format_date(self, date): """(date : date) -> str """ if date: return date.strftime('%Y-%m-%d') else: return '' def format_user(self, user): if user: return user.get_id() else: return '' def get_platform_class(self): agent = get_request().get_header('user-agent', '-') if 'Windows' in agent: return 'Windows' if 'Mac OS X' in agent: return 'OSX' return 'standard-platform' def get_agent_class(self): agent = get_request().get_header('user-agent', '-') if 'MSIE 5.' in agent: if 'Mac' in agent: return 'MacIE5' else: return 'IE5' if 'MSIE 6.' in agent: return 'IE6' if 'MSIE 7.' in agent: return 'IE7' if 'MSIE 8.' in agent: return 'IE8' if 'MSIE 9.' in agent: return 'IE9' if 'iPhone' in agent: return 'iPhone' if 'iPad' in agent: return 'iPad' if 'Android 2.2' in agent: return 'Android22' if 'Android 2.3' in agent: return 'Android23' if 'Android 3.0' in agent: return 'Android30' if 'Android' in agent: return 'Android' if 'Firefox' in agent: return 'Firefox' return 'standard-agent' add_getters(Publisher) class DurusPublisher (Publisher): """ This is the standard Publisher class for QP sites that use a Durus Database. """ connection_is = spec( Connection, "the Durus Connection") test_site_name = "proto" def __init__(self, connection=None, **other): Publisher.__init__(self, **other) self.connection = connection or Connection(TempFileStorage()) durus_cache_size = self.get_site().get_durus_cache_size() self.connection.set_cache_size(durus_cache_size) self.ensure_database_initialized() def ensure_database_initialized(self): """ Make sure that database root has sessions and users mappings. If not, make them and commit(). """ self.ensure_sessions_initialized() self.ensure_users_initialized() def ensure_sessions_initialized(self): if self.get_sessions() is None: self.get_root()['sessions'] = BTree() self.commit() def ensure_users_initialized(self): if self.get_users() is None: users = BTree() users[''] = self.create_user("") self.get_root()['users'] = users self.commit() def run_durus(site): """ Launch the durus server for this site. """ server = StorageServer( FileStorage(site.get_durus_file()), gcbytes=site.get_gcbytes(), address=site.get_durus_address()) site.ensure_uid_gid_not_root() server.serve() run_durus = staticmethod(run_durus) def create_user(self, user_id): return User(user_id) def create_session(self): return Session() def commit(self): """Commit changes to the database.""" self.get_connection().commit() def abort(self): """ Make sure that the connection has no uncommitted object state. """ self.get_connection().abort() def get_root(self): """() -> PersistentDict Return the root object of the database. """ return self.get_connection().get_root() def get_sessions(self): """() -> BTree Return the session mapping. """ return self.get_root().get('sessions') def get_users(self): """() -> BTree Returns the mapping from user ids to user instances. """ return self.get_root().get('users') def gen_active_users(self): for user in self.get_users().itervalues(): yield user def add_user(self, user): """(User) Add a new user to the application. """ users = self.get_users() assert user.get_id() not in users users[user.get_id()] = user def lookup_user(self, id=None, email=None, **kwargs): """ Returns a User or None if no such user is found. """ user = self.get_users().get(id) if user is not None: return user if (email is not None and match(email, self.get_users().get('').email_is)): lower_email = email.lower() for user in self.get_users().itervalues(): user_email = user.get_email() if user_email is not None and user_email.lower() == email: return user return None # Authentication methods def sign_out(self, url): """(url:str) Un-authenticate the current user and redirect to `url`. """ get_session().clear_authentication() self.redirect(url) def delete_current_session(self): current_session = get_session() current_key = None for key, session in self.get_sessions().items(): if session is session: current_key = key break if current_key is not None: del self.get_sessions()[current_key] def ensure_signed_in_using_form(self, title='Please Sign In', realm=None, **kwargs): """ Make sure that the current user is signed in. This presents a form to the user. Because the form transmits the password, this redirects to the https address before presenting the form. The realm, if given, identifies which of the user's passwords should be used. The default realm is the name of the site. """ if not get_user(): self.secure() form = Form(use_tokens=False) default_user_id = None if get_session().get_owner(): default_user_id = get_session().get_owner().get_id() form.add_string( 'name', value=default_user_id, title="User Name", required=True) form.add_password('password', title="Password", required=True) form.add_submit('login', 'Sign in') def show_form(): self.respond(title, form.render(), status=403, **kwargs) if not form.is_submitted() or form.has_errors(): show_form() user = self.get_users().get(form.get('name')) if user and user.has_password(form.get('password') or '', realm=realm): get_session().set_authenticated(user) self.redirect('') if not user: form.set_error('name', 'unknown user') else: form.set_error('password', 'wrong password') show_form() def ensure_signed_in_using_basic(self, realm=None): """ Make sure that the current user is signed in. This uses HTTP Basic Authentication. Because Basic Authentication transmits the password, this implementation redirects if necessary to make sure that the challenge only happens when the scheme is https. The realm, if given, identifies which of the user's passwords should be used. The default realm is the name of the site. """ if not get_user(): # Look to see if authentication credentials have been delivered. authorization = get_request().get_header( 'HTTP_AUTHORIZATION', '').split() if len(authorization) == 2: scheme, encoded = authorization try: b64decoded = b64decode(as_bytes(encoded)) except binascii.Error: pass else: decoded = as_str(b64decoded).split(':') if scheme.lower() == 'basic' and len(decoded) == 2: username, password = decoded user = self.get_users().get(username) if user and user.has_password(password, realm=realm): get_session().set_authenticated(user) # success if not get_user(): # Issue an authentication challenge. assert self.get_site().get_https_address(), ( "If you want basic authentication, use https.") self.secure() # This redirects to https. if realm is None: realm = self.get_site().get_name() self.get_hit().init_response() self.get_hit().get_response().set_status(401) self.get_hit().get_response().set_header( 'WWW-Authenticate', 'Basic realm="%s"' % realm) self.respond_now() def ensure_signed_in_using_digest(self, realm=None): """ Make sure that the current user is signed in. This uses HTTP Digest Authentication. Since this does not transmit the password itself, we allow this to work even when https is not available. If https is available, though, we redirect to it first. The realm, if given, identifies which of the user's passwords should be used. The default realm is the name of the site. """ if not get_user(): if realm is None: realm = self.get_site().get_name() def attempt_digest_authentication(): authorization = get_request().get_header( 'HTTP_AUTHORIZATION', '').split() if len(authorization) < 5 and len(authorization) > 0: authorization[-1:] = authorization[-1].split(',') if len(authorization) < 5: return if authorization[0].lower() != 'digest': return parameters = {} for item in authorization[1:]: item_split = item.split('=', 1) if len(item_split) != 2: continue name, value = item_split if value[-1] == ',': value = value[:-1] if value and value[0]=='"' and value[-1]=='"': value = value[1:-1] parameters[name.lower()] = value username = parameters.get('username', None) if not username: return user = self.get_users().get(username) if not user: return nonce = parameters.get('nonce') if nonce not in get_user().get_tokens(): return get_user().get_tokens().remove(nonce) cnonce = parameters.get('cnonce') if not cnonce: return nc = parameters.get('nc') if not nc: return qop = parameters.get('qop') if not qop: return request_digest = parameters.get('response', '') if len(request_digest) != 32: return realm = parameters.get('realm') user_digest = user.get_digester().get_digest(realm) if not user_digest: return method = get_request().get_method() if request_digest == compute_digest( user_digest, nonce, nc, cnonce, qop, compute_digest(method, parameters.get('uri', ''))): # success get_session().set_authenticated(user) attempt_digest_authentication() if not get_user(): # Issue an authentication challenge. self.secure() self.get_hit().init_response() self.get_hit().get_response().set_status(401) self.get_hit().get_response().set_body("Do I know you?") nonce = get_user().get_tokens().new_token() self.get_hit().get_response().set_header( 'WWW-Authenticate', ('Digest realm="%s", nonce="%s", opaque="0%s", stale=false, ' 'algorithm=MD5, qop="auth"' % (realm, nonce, nonce))) self.respond_now() def ensure_signed_in(self): """ Make sure the user is authenticated using the default method. """ self.ensure_signed_in_using_digest() def get_fields(self): try: return Publisher.get_fields(self) except RespondNow: self.abort() raise def process_hit(self, hit): """(hit:Hit) This processes the request on the hit and sets the response on the hit. It handles conflict errors and commits changes when there are no error exceptions. Note that this overrides a method of the base class. """ self.set_hit(hit) try: for attempt in range(5): try: hit.init_response() self.abort() try: self.fill_response() except RespondNow: pass self.log_hit() self.commit() return except ConflictError: self.log_exception() self.abort() else: raise RuntimeError("too many Conflict errors") except: self.handle_exception() self.log_hit() self.abort() def fill_response(self): """ Use the request to prepare the response. This method takes care of session management, and calls the fill_response_with_session_present() method to do the rest. Note that this overrides a method of the base class. """ cookie_name = self.get_site().get_name() cookie = (get_request().get_cookie(cookie_name) or get_request().get_query()) if len(cookie) != 16: # len of randbytes(8) cookie generated below cookie = None session = cookie and self.get_sessions().get(cookie) if session and not session.is_valid(): del self.get_sessions()[cookie] session = None if session is None: session = self.create_session() self.get_hit().set_session(session) session.access() try: self.fill_response_using_root_directory() finally: session = self.get_hit().get_session() if session.needs_saving(): idle_session_cookie = session.get_owner().get_idle_session_cookie() idle_session = None if idle_session_cookie is not None: idle_session = self.get_sessions().get(idle_session_cookie) if idle_session is not None: if idle_session.get_owner() is not session.get_owner(): idle_session = None if idle_session_cookie and idle_session and not idle_session.get_effective_user(): # Recycle idle Session. idle_session.set_authenticated(session.get_effective_user()) cookie = idle_session_cookie else: # Save new Session. cookie = randbytes(8).decode('latin1') self.get_sessions()[cookie] = session get_response().set_cookie( cookie_name, cookie, path=get_request().get_script_name() or "/", secure=(get_request().get_scheme() == 'https')) def format_exception_report(self): """() -> str This returns a string that reports an error exception. """ report = "user = %s\n\n" % self.format_user_info_for_log() report += Publisher.format_exception_report(self) return report def format_user_info_for_log(self): def format_id(user): user_id = user.get_id() if user_id == '': return '-' try: return str(user_id).replace(' ', '_') except UnicodeDecodeError: return repr(user_id).replace(' ', '_') session = get_session() owner = session.get_owner() owner_part = format_id(owner) user = session.get_effective_user() if user.get_id() != '' and user is owner: user_part = '+' else: user_part = format_id(user) return owner_part + " " + user_part add_getters(DurusPublisher)