Source code for flask.ext.admin.contrib.fileadmin

import os
import os.path as op
import platform
import re
import shutil

from datetime import datetime
from operator import itemgetter
from werkzeug import secure_filename

from flask import flash, redirect, abort, request, send_file

from wtforms import fields, validators

from flask.ext.admin import form, helpers
from flask.ext.admin._compat import urljoin, as_unicode
from flask.ext.admin.base import BaseView, expose
from flask.ext.admin.actions import action, ActionsMixin
from flask.ext.admin.babel import gettext, lazy_gettext


class NameForm(form.BaseForm):
    """
        Form with a filename input field.

        Validates if provided name is valid for *nix and Windows systems.
    """
    name = fields.StringField()

    regexp = re.compile(r'^(?!^(PRN|AUX|CLOCK\$|NUL|CON|COM\d|LPT\d|\..*)(\..+)?$)[^\x00-\x1f\\?*:\";|/]+$')

    def validate_name(self, field):
        if not self.regexp.match(field.data):
            raise validators.ValidationError(gettext('Invalid directory name'))


class UploadForm(form.BaseForm):
    """
        File upload form. Works with FileAdmin instance to check if it is allowed
        to upload file with given extension.
    """
    upload = fields.FileField(lazy_gettext('File to upload'))

    def __init__(self, admin):
        self.admin = admin

        super(UploadForm, self).__init__(helpers.get_form_data())

    def validate_upload(self, field):
        if not self.upload.data:
            raise validators.ValidationError(gettext('File required.'))

        filename = self.upload.data.filename

        if not self.admin.is_file_allowed(filename):
            raise validators.ValidationError(gettext('Invalid file type.'))


class EditForm(form.BaseForm):
    content = fields.TextAreaField(lazy_gettext('Content'),
                                   (validators.required(),))


[docs]class FileAdmin(BaseView, ActionsMixin): """ Simple file-management interface. :param path: Path to the directory which will be managed :param base_url: Optional base URL for the directory. Will be used to generate static links to the files. If not defined, a route will be created to serve uploaded files. Sample usage:: admin = Admin() path = op.join(op.dirname(__file__), 'static') admin.add_view(FileAdmin(path, '/static/', name='Static Files')) admin.setup_app(app) """ can_upload = True """ Is file upload allowed. """ can_download = True """ Is file download allowed. """ can_delete = True """ Is file deletion allowed. """ can_delete_dirs = True """ Is recursive directory deletion is allowed. """ can_mkdir = True """ Is directory creation allowed. """ can_rename = True """ Is file and directory renaming allowed. """ allowed_extensions = None """ List of allowed extensions for uploads, in lower case. Example:: class MyAdmin(FileAdmin): allowed_extensions = ('swf', 'jpg', 'gif', 'png') """ editable_extensions = tuple() """ List of editable extensions, in lower case. Example:: class MyAdmin(FileAdmin): editable_extensions = ('md', 'html', 'txt') """ list_template = 'admin/file/list.html' """ File list template """ upload_template = 'admin/file/form.html' """ File upload template """ mkdir_template = 'admin/file/form.html' """ Directory creation (mkdir) template """ rename_template = 'admin/file/rename.html' """ Rename template """ edit_template = 'admin/file/edit.html' """ Edit template """ upload_form = UploadForm """ Upload form class """ def __init__(self, base_path, base_url=None, name=None, category=None, endpoint=None, url=None, verify_path=True): """ Constructor. :param base_path: Base file storage location :param base_url: Base URL for the files :param name: Name of this view. If not provided, will default to the class name. :param category: View category :param endpoint: Endpoint name for the view :param url: URL for view :param verify_path: Verify if path exists. If set to `True` and path does not exist will raise an exception. """ self.base_path = as_unicode(base_path) self.base_url = base_url self.init_actions() self._on_windows = platform.system() == 'Windows' # Convert allowed_extensions to set for quick validation if (self.allowed_extensions and not isinstance(self.allowed_extensions, set)): self.allowed_extensions = set(self.allowed_extensions) # Convert editable_extensions to set for quick validation if (self.editable_extensions and not isinstance(self.editable_extensions, set)): self.editable_extensions = set(self.editable_extensions) # Check if path exists if not op.exists(base_path): raise IOError('FileAdmin path "%s" does not exist or is not accessible' % base_path) super(FileAdmin, self).__init__(name, category, endpoint, url)
[docs] def is_accessible_path(self, path): """ Verify if the provided path is accessible for the current user. Override to customize behavior. :param path: Relative path to the root """ return True
[docs] def get_base_path(self): """ Return base path. Override to customize behavior (per-user directories, etc) """ return op.normpath(self.base_path)
[docs] def get_base_url(self): """ Return base URL. Override to customize behavior (per-user directories, etc) """ return self.base_url
[docs] def is_file_allowed(self, filename): """ Verify if file can be uploaded. Override to customize behavior. :param filename: Source file name """ ext = op.splitext(filename)[1].lower() if ext.startswith('.'): ext = ext[1:] if self.allowed_extensions and ext not in self.allowed_extensions: return False return True
[docs] def is_file_editable(self, filename): """ Determine if the file can be edited. Override to customize behavior. :param filename: Source file name """ ext = op.splitext(filename)[1].lower() if ext.startswith('.'): ext = ext[1:] if not self.editable_extensions or ext not in self.editable_extensions: return False return True
[docs] def is_in_folder(self, base_path, directory): """ Verify that `directory` is in `base_path` folder :param base_path: Base directory path :param directory: Directory path to check """ return op.normpath(directory).startswith(base_path)
[docs] def save_file(self, path, file_data): """ Save uploaded file to the disk :param path: Path to save to :param file_data: Werkzeug `FileStorage` object """ file_data.save(path)
def _get_dir_url(self, endpoint, path=None, **kwargs): """ Return prettified URL :param endpoint: Endpoint name :param path: Directory path :param kwargs: Additional arguments """ if not path: return self.get_url(endpoint) else: if self._on_windows: path = path.replace('\\', '/') kwargs['path'] = path return self.get_url(endpoint, **kwargs) def _get_file_url(self, path): """ Return static file url :param path: Static file path """ if self.is_file_editable(path): route = '.edit' else: route = '.download' return self.get_url(route, path=path) def _normalize_path(self, path): """ Verify and normalize path. If the path is not relative to the base directory, will raise a 404 exception. If the path does not exist, this will also raise a 404 exception. """ base_path = self.get_base_path() if path is None: directory = base_path path = '' else: path = op.normpath(path) directory = op.normpath(op.join(base_path, path)) if not self.is_in_folder(base_path, directory): abort(404) if not op.exists(directory): abort(404) return base_path, directory, path def is_action_allowed(self, name): if name == 'delete' and not self.can_delete: return False elif name == 'edit' and len(self.editable_extensions) == 0: return False return True
[docs] def on_rename(self, full_path, dir_base, filename): """ Perform some actions after a file or directory has been renamed. Called from rename method By default do nothing. """ pass
[docs] def on_edit_file(self, full_path, path): """ Perform some actions after a file has been successfully changed. Called from edit method By default do nothing. """ pass
[docs] def on_file_upload(self, directory, path, filename): """ Perform some actions after a file has been successfully uploaded. Called from upload method By default do nothing. """ pass
[docs] def on_mkdir(self, parent_dir, dir_name): """ Perform some actions after a directory has successfully been created. Called from mkdir method By default do nothing. """ pass
[docs] def on_directory_delete(self, full_path, dir_name): """ Perform some actions after a directory has successfully been deleted. Called from delete method By default do nothing. """ pass
[docs] def on_file_delete(self, full_path, filename): """ Perform some actions after a file has successfully been deleted. Called from delete method By default do nothing. """ pass
def _save_form_files(self, directory, path, form): filename = op.join(directory, secure_filename(form.upload.data.filename)) if op.exists(filename): flash(gettext('File "%(name)s" already exists.', name=filename), 'error') else: self.save_file(filename, form.upload.data) self.on_file_upload(directory, path, filename) @expose('/') @expose('/b/<path:path>')
[docs] def index(self, path=None): """ Index view method :param path: Optional directory path. If not provided, will use the base directory """ # Get path and verify if it is valid base_path, directory, path = self._normalize_path(path) if not self.is_accessible_path(path): flash(gettext('Permission denied.')) return redirect(self._get_dir_url('.index')) # Get directory listing items = [] # Parent directory if directory != base_path: parent_path = op.normpath(op.join(path, '..')) if parent_path == '.': parent_path = None items.append(('..', parent_path, True, 0)) for f in os.listdir(directory): fp = op.join(directory, f) rel_path = op.join(path, f) if self.is_accessible_path(rel_path): items.append((f, rel_path, op.isdir(fp), op.getsize(fp), op.getmtime(fp))) # Sort by name items.sort(key=itemgetter(0)) # Sort by type items.sort(key=itemgetter(2), reverse=True) # Sort by modified date items.sort(key=lambda values: (values[0], values[1], values[2], values[3], datetime.fromtimestamp(values[4])), reverse=True) # Generate breadcrumbs accumulator = [] breadcrumbs = [] for n in path.split(os.sep): accumulator.append(n) breadcrumbs.append((n, op.join(*accumulator))) # Actions actions, actions_confirmation = self.get_actions_list() return self.render(self.list_template, dir_path=path, breadcrumbs=breadcrumbs, get_dir_url=self._get_dir_url, get_file_url=self._get_file_url, items=items, actions=actions, actions_confirmation=actions_confirmation)
@expose('/upload/', methods=('GET', 'POST')) @expose('/upload/<path:path>', methods=('GET', 'POST'))
[docs] def upload(self, path=None): """ Upload view method :param path: Optional directory path. If not provided, will use the base directory """ # Get path and verify if it is valid base_path, directory, path = self._normalize_path(path) if not self.can_upload: flash(gettext('File uploading is disabled.'), 'error') return redirect(self._get_dir_url('.index', path)) if not self.is_accessible_path(path): flash(gettext('Permission denied.')) return redirect(self._get_dir_url('.index')) form = self.upload_form(self) if helpers.validate_form_on_submit(form): try: self._save_form_files(directory, path, form) return redirect(self._get_dir_url('.index', path)) except Exception as ex: flash(gettext('Failed to save file: %(error)s', error=ex)) return self.render(self.upload_template, form=form)
@expose('/download/<path:path>')
[docs] def download(self, path=None): """ Download view method. :param path: File path. """ if not self.can_download: abort(404) base_path, directory, path = self._normalize_path(path) # backward compatibility with base_url base_url = self.get_base_url() if base_url: base_url = urljoin(self.get_url('.index'), base_url) return redirect(urljoin(base_url, path)) return send_file(directory)
@expose('/mkdir/', methods=('GET', 'POST')) @expose('/mkdir/<path:path>', methods=('GET', 'POST'))
[docs] def mkdir(self, path=None): """ Directory creation view method :param path: Optional directory path. If not provided, will use the base directory """ # Get path and verify if it is valid base_path, directory, path = self._normalize_path(path) dir_url = self._get_dir_url('.index', path) if not self.can_mkdir: flash(gettext('Directory creation is disabled.'), 'error') return redirect(dir_url) if not self.is_accessible_path(path): flash(gettext('Permission denied.')) return redirect(self._get_dir_url('.index')) form = NameForm(helpers.get_form_data()) if helpers.validate_form_on_submit(form): try: os.mkdir(op.join(directory, form.name.data)) self.on_mkdir(directory, form.name.data) return redirect(dir_url) except Exception as ex: flash(gettext('Failed to create directory: %(error)s', error=ex), 'error') return self.render(self.mkdir_template, form=form, dir_url=dir_url)
@expose('/delete/', methods=('POST',))
[docs] def delete(self): """ Delete view method """ path = request.form.get('path') if not path: return redirect(self.get_url('.index')) # Get path and verify if it is valid base_path, full_path, path = self._normalize_path(path) return_url = self._get_dir_url('.index', op.dirname(path)) if not self.can_delete: flash(gettext('Deletion is disabled.')) return redirect(return_url) if not self.is_accessible_path(path): flash(gettext('Permission denied.')) return redirect(self._get_dir_url('.index')) if op.isdir(full_path): if not self.can_delete_dirs: flash(gettext('Directory deletion is disabled.')) return redirect(return_url) try: shutil.rmtree(full_path) self.on_directory_delete(full_path, path) flash(gettext('Directory "%(path)s" was successfully deleted.', path=path)) except Exception as ex: flash(gettext('Failed to delete directory: %(error)s', error=ex), 'error') else: try: os.remove(full_path) self.on_file_delete(full_path, path) flash(gettext('File "%(name)s" was successfully deleted.', name=path)) except Exception as ex: flash(gettext('Failed to delete file: %(name)s', name=ex), 'error') return redirect(return_url)
@expose('/rename/', methods=('GET', 'POST'))
[docs] def rename(self): """ Rename view method """ path = request.args.get('path') if not path: return redirect(self.get_url('.index')) base_path, full_path, path = self._normalize_path(path) return_url = self._get_dir_url('.index', op.dirname(path)) if not self.can_rename: flash(gettext('Renaming is disabled.')) return redirect(return_url) if not self.is_accessible_path(path): flash(gettext('Permission denied.')) return redirect(self._get_dir_url('.index')) if not op.exists(full_path): flash(gettext('Path does not exist.')) return redirect(return_url) form = NameForm(helpers.get_form_data(), name=op.basename(path)) if helpers.validate_form_on_submit(form): try: dir_base = op.dirname(full_path) filename = secure_filename(form.name.data) os.rename(full_path, op.join(dir_base, filename)) self.on_rename(full_path, dir_base, filename) flash(gettext('Successfully renamed "%(src)s" to "%(dst)s"', src=op.basename(path), dst=filename)) except Exception as ex: flash(gettext('Failed to rename: %(error)s', error=ex), 'error') return redirect(return_url) return self.render(self.rename_template, form=form, path=op.dirname(path), name=op.basename(path), dir_url=return_url)
@expose('/edit/', methods=('GET', 'POST'))
[docs] def edit(self): """ Edit view method """ next_url = None path = request.args.getlist('path') if not path: return redirect(self.get_url('.index')) if len(path) > 1: next_url = self.get_url('.edit', path=path[1:]) path = path[0] base_path, full_path, path = self._normalize_path(path) if not self.is_accessible_path(path) or not self.is_file_editable(path): flash(gettext('Permission denied.')) return redirect(self._get_dir_url('.index')) dir_url = self._get_dir_url('.index', os.path.dirname(path)) next_url = next_url or dir_url form = EditForm(helpers.get_form_data()) error = False if helpers.validate_form_on_submit(form): form.process(request.form, content='') if form.validate(): try: with open(full_path, 'w') as f: f.write(request.form['content']) except IOError: flash(gettext("Error saving changes to %(name)s.", name=path), 'error') error = True else: self.on_edit_file(full_path, path) flash(gettext("Changes to %(name)s saved successfully.", name=path)) return redirect(next_url) else: try: with open(full_path, 'r') as f: content = f.read() except IOError: flash(gettext("Error reading %(name)s.", name=path), 'error') error = True except: flash(gettext("Unexpected error while reading from %(name)s", name=path), 'error') error = True else: try: content = content.decode('utf8') except UnicodeDecodeError: flash(gettext("Cannot edit %(name)s.", name=path), 'error') error = True except: flash(gettext("Unexpected error while reading from %(name)s", name=path), 'error') error = True else: form.content.data = content return self.render(self.edit_template, dir_url=dir_url, path=path, form=form, error=error)
@expose('/action/', methods=('POST',)) def action_view(self): return self.handle_action() # Actions @action('delete', lazy_gettext('Delete'), lazy_gettext('Are you sure you want to delete these files?')) def action_delete(self, items): if not self.can_delete: flash(gettext('File deletion is disabled.'), 'error') return for path in items: base_path, full_path, path = self._normalize_path(path) if self.is_accessible_path(path): try: os.remove(full_path) flash(gettext('File "%(name)s" was successfully deleted.', name=path)) except Exception as ex: flash(gettext('Failed to delete file: %(name)s', name=ex), 'error') @action('edit', lazy_gettext('Edit')) def action_edit(self, items): return redirect(self.get_url('.edit', path=items))

Related Topics