# -*- coding: utf-8 -*-
"""
flask_pluginkit._installer
~~~~~~~~~~~~~~~~~~~~~~~~~~
installer: install or remove plugin.
:copyright: (c) 2019 by staugur.
:license: BSD 3-Clause, see LICENSE for more details.
"""
import os
import re
import shutil
import tarfile
import zipfile
from sys import executable
from subprocess import call
from cgi import parse_header
from posixpath import basename
from tempfile import NamedTemporaryFile
from .exceptions import PluginError, TarError, ZipError, InstallError
from ._compat import PY2, string_types, urllib2, urlsplit, parse_qs
from .utils import check_url
[docs]class PluginInstaller(object):
"""plugin installer for installing a compressed local/remote plugin"""
def __init__(self, plugin_abspath, **kwargs):
"""
:param plugin_abspath: the absolute path to the plugin directory.
"""
self.plugin_abspath = plugin_abspath
if not os.path.isdir(self.plugin_abspath):
raise PluginError("Not Found Plugin Directory")
def __isValidTGZ(self, suffix):
"""To determine whether the suffix `.tar.gz` or `.tgz` format"""
if suffix and isinstance(suffix, string_types):
if suffix.endswith(".tar.gz") or suffix.endswith(".tgz"):
return True
return False
def __isValidZIP(self, suffix):
"""Determine if the suffix is `.zip` format"""
if suffix and isinstance(suffix, string_types):
if suffix.endswith(".zip"):
return True
return False
def __isValidFilename(self, filename):
"""Determine whether filename is valid"""
if filename and isinstance(filename, string_types):
if re.match(r'^[\w\d\_\-\.]+$', filename.split('/')[-1], re.I):
if self.__isValidTGZ(filename) or self.__isValidZIP(filename):
return True
return False
def __getFilename(self, data, scene=1):
"""To get the data from different scenarios in the filename,
scene value see `remote_download` in 1, 2, 3, 4
"""
filename = None
try:
if scene == 1:
plugin_filename = [i for i in parse_qs(urlsplit(data).query)
.get("plugin_filename") or [] if i]
if plugin_filename and len(plugin_filename) == 1:
filename = plugin_filename[0]
elif scene == 2:
filename = basename(urlsplit(data).path)
elif scene == 3:
if PY2:
cd = data.headers.getheader("Content-Disposition", "")
else:
cd = data.getheader("Content-Disposition", "")
filename = parse_header(cd)[-1].get("filename")
elif scene == 4:
if PY2:
cd = data.info().subtype
else:
cd = data.info().get_content_subtype()
mt = {'zip': 'zip', 'x-compressed-tar': 'tar.gz',
'x-gzip': 'tar.gz'}
subtype = mt.get(cd)
if subtype:
filename = "." + subtype
finally:
if self.__isValidFilename(filename):
return filename
def __getFilenameSuffix(self, filename):
"""Gets the filename suffix"""
if filename and isinstance(filename, string_types):
if self.__isValidTGZ(filename):
return ".tar.gz"
elif filename.endswith(".zip"):
return ".zip"
def __unpack_tgz(self, filename):
"""Unpack the `tar.gz`, `tgz` compressed file format"""
if isinstance(filename, string_types) and \
self.__isValidTGZ(filename) and \
tarfile.is_tarfile(filename):
with tarfile.open(filename, mode='r:gz') as t:
for name in t.getnames():
t.extract(name, self.plugin_abspath)
else:
raise TarError("Invalid Plugin Compressed File")
def __unpack_zip(self, filename):
"""Unpack the `zip` compressed file format"""
if isinstance(filename, string_types) and \
self.__isValidZIP(filename) and \
zipfile.is_zipfile(filename):
with zipfile.ZipFile(filename) as z:
for name in z.namelist():
z.extract(name, self.plugin_abspath)
else:
raise ZipError("Invalid Plugin Compressed File")
def _remote_download(self, url):
"""To download the remote plugin package,
there are four methods of setting filename according to priority,
each of which stops setting when a qualified filename is obtained,
and an exception is triggered when a qualified valid filename is
ultimately unavailable.
1. Add url `plugin_filename` query parameters
2. The file name is resolved in the url, eg: http://xx.xx.com/plugin-v0.0.1.tar.gz
3. Parse the Content-Disposition in the return header
4. Parse the Content-Type in the return header
"""
#: Try to set filename in advance based on the previous two steps
if check_url(url):
filename = self.__getFilename(url, scene=1)
if not filename:
filename = self.__getFilename(url, scene=2)
#: fix UnboundLocalError
f = None
try:
f = urllib2.urlopen(url, timeout=10)
except (AttributeError, ValueError, urllib2.URLError):
raise InstallError("Open URL Error")
else:
if not filename:
filename = self.__getFilename(f, scene=3)
if not filename:
filename = self.__getFilename(f, scene=4)
if filename and self.__isValidFilename(filename):
suffix = self.__getFilenameSuffix(filename)
with NamedTemporaryFile(mode='w+b', prefix='fpk-',
suffix=suffix, delete=False) as fp:
fp.write(f.read())
filename = fp.name
try:
if self.__isValidTGZ(suffix):
self.__unpack_tgz(filename)
else:
self.__unpack_zip(filename)
finally:
os.remove(filename)
else:
raise InstallError("Invalid Filename")
finally:
if f is not None:
f.close()
else:
raise InstallError("Invalid URL")
def _local_upload(self, filepath, remove=False):
"""Local plugin package processing"""
if os.path.isfile(filepath):
filename = os.path.basename(os.path.abspath(filepath))
if filename and self.__isValidFilename(filename):
suffix = self.__getFilenameSuffix(filename)
try:
if self.__isValidTGZ(suffix):
self.__unpack_tgz(os.path.abspath(filepath))
else:
self.__unpack_zip(os.path.abspath(filepath))
finally:
if remove is True:
os.remove(filepath)
else:
raise InstallError("Invalid Filename")
else:
raise InstallError("Invalid Filepath")
def _pip_install(self, package_or_url):
"""Use the pip command to install third-party modules.
.. versionadded:: 3.3.0
"""
res = dict(code=1, msg=None)
if package_or_url and \
isinstance(package_or_url, string_types) and \
package_or_url not in (".", "-"):
code = call([executable, "-m", "pip", "install", package_or_url])
res.update(code=code)
if code != 0:
res.update(msg="Installation failed with pip command")
else:
res.update(msg="Invalid parameter package_or_url", code=1)
return res
[docs] def addPlugin(self, method="remote", **kwargs):
"""Add a plugin, support only for `.tar.gz` or `.zip` compression packages.
:param method: supported method:
``remote``, download and unpack a remote plugin package;
``local``, unzip a local plugin package.
``pip``, install package with pip command.
:param url: for method is remote,
plugin can be downloaded from the address.
:param filepath: for method is local, plugin local absolute path
:param remove: for method is local, remove the plugin source code
package, default is False.
:param package_or_url: for method is pip, pypi's package or VCS url.
:returns: the result of adding the plugin, like {msg:str, code:int},
code=0 is successful.
.. versionchanged:: 3.3.0
Add pip method, with package_or_url param.
"""
res = dict(code=1, msg=None)
try:
if method == "remote":
self._remote_download(kwargs["url"])
elif method == "local":
self._local_upload(kwargs["filepath"],
kwargs.get("remove", False))
elif method == "pip": # pragma: nocover
res = self._pip_install(kwargs["package_or_url"])
else:
res.update(msg="Invalid method")
except Exception as e:
res.update(msg=str(e))
else:
if method != "pip":
res.update(code=0)
return res
[docs] def removePlugin(self, package):
"""Remove a local plugin
:param package: The plugin package name, not __plugin_name.
"""
res = dict(code=1, msg=None)
if package and isinstance(package, string_types):
path = os.path.join(self.plugin_abspath, package)
if os.path.isdir(path):
try:
shutil.rmtree(path)
except Exception as e:
res.update(msg=str(e))
else:
res.update(code=0)
else:
res.update(msg="No Such Package")
else:
res.update(msg="Invalid Package Format")
return res