diff --git a/MountEFI.command b/MountEFI.command index 1bdaa97..b95e3b3 100755 --- a/MountEFI.command +++ b/MountEFI.command @@ -1,345 +1,4 @@ #!/usr/bin/env python -# 0.0.0 -from Scripts import * -import os, tempfile, datetime, shutil, time, plistlib, json, sys, argparse +from mount_efi.__main__ import main -class MountEFI: - def __init__(self, **kwargs): - self.r = run.Run() - self.d = disk.Disk() - self.dl = downloader.Downloader() - self.u = utils.Utils("MountEFI") - self.re = reveal.Reveal() - # Get the tools we need - self.script_folder = "Scripts" - self.update_url = "https://raw.githubusercontent.com/corpnewt/MountEFIv2/master/MountEFI.command" - - self.settings_file = kwargs.get("settings", None) - cwd = os.getcwd() - os.chdir(os.path.dirname(os.path.realpath(__file__))) - if self.settings_file and os.path.exists(self.settings_file): - self.settings = json.load(open(self.settings_file)) - else: - self.settings = { - # Default settings here - "default_disk" : None, - "after_mount" : None, - "full_layout" : False, - "skip_countdown" : False, - } - os.chdir(cwd) - self.full = self.settings.get("full_layout", False) - - def check_update(self): - # Checks against https://raw.githubusercontent.com/corpnewt/MountEFIv2/master/MountEFI.command to see if we need to update - self.u.head("Checking for Updates") - print(" ") - with open(os.path.realpath(__file__), "r") as f: - # Our version should always be the second line - version = get_version(f.read()) - print(version) - try: - new_text = _get_string(url) - new_version = get_version(new_text) - except: - # Not valid json data - print("Error checking for updates (network issue)") - return - - if version == new_version: - # The same - return - print("v{} is already current.".format(version)) - return - # Split the version number - try: - v = version.split(".") - cv = new_version.split(".") - except: - # not formatted right - bail - print("Error checking for updates (version string malformed)") - return - - if not need_update(cv, v): - print("v{} is already current.".format(version)) - return - - # Update - with open(os.path.realpath(__file__), "w") as f: - f.write(new_text) - - # chmod +x, then restart - run_command(["chmod", "+x", __file__]) - os.execv(__file__, sys.argv) - - def flush_settings(self): - if self.settings_file: - cwd = os.getcwd() - os.chdir(os.path.dirname(os.path.realpath(__file__))) - json.dump(self.settings, open(self.settings_file, "w")) - os.chdir(cwd) - - def after_mount(self): - self.u.resize(80, 24) - self.u.head("After Mount Action") - print(" ") - print("1. Return to Menu") - print("2. Quit") - print("3. Open EFI and Return to Menu") - print("4. Open EFI and Quit") - if not self.settings.get("skip_countdown", False): - print("5. Skip After-Mount Countdown") - else: - print("5. Use After-Mount Countdown") - print(" ") - print("M. Main Menu") - print("Q. Quit") - print(" ") - menu = self.u.grab("Please pick an option: ") - if not len(menu): - self.after_mount() - return - menu = menu.lower() - if menu in ["1","2","3","4"]: - self.settings["after_mount"] = [ - "Return to Menu", - "Quit", - "Reveal and Return to Menu", - "Reveal and Quit" - ][int(menu)-1] - self.flush_settings() - return - elif menu == "5": - cd = self.settings.get("skip_countdown", False) - cd ^= True - self.settings["skip_countdown"] = cd - self.flush_settings() - self.after_mount() - return - elif menu == "m": - return - elif menu == "q": - self.u.custom_quit() - self.after_mount() - - def default_disk(self): - self.d.update() - clover = bdmesg.get_bootloader_uuid() - print(clover) - self.u.resize(80, 24) - self.u.head("Select Default Disk") - print(" ") - print("1. None") - print("2. Boot Disk") - if clover: - print("3. Booted EFI (Clover/OC)") - print(" ") - print("M. Main Menu") - print("Q. Quit") - print(" ") - menu = self.u.grab("Please pick a default disk: ") - if not len(menu): - self.default_disk() - menu = menu.lower() - if menu in ["1","2"]: - self.settings["default_disk"] = [None, "boot"][int(menu)-1] - self.flush_settings() - return - elif menu == "3" and clover: - self.settings["default_disk"] = "clover" - self.flush_settings() - return - elif menu == "m": - return - elif menu == "q": - self.u.custom_quit() - self.default_disk() - - def get_efi(self): - self.d.update() - clover = bdmesg.get_bootloader_uuid() - i = 0 - disk_string = "" - if not self.full: - clover_disk = self.d.get_parent(clover) - mounts = self.d.get_mounted_volume_dicts() - for d in mounts: - i += 1 - disk_string += "{}. {} ({})".format(i, d["name"], d["identifier"]) - if self.d.get_parent(d["identifier"]) == clover_disk: - # if d["disk_uuid"] == clover: - disk_string += " *" - disk_string += "\n" - else: - mounts = self.d.get_disks_and_partitions_dict() - disks = mounts.keys() - for d in disks: - i += 1 - disk_string+= "{}. {}:\n".format(i, d) - parts = mounts[d]["partitions"] - part_list = [] - for p in parts: - p_text = " - {} ({})".format(p["name"], p["identifier"]) - if p["disk_uuid"] == clover: - # Got Clover - p_text += " *" - part_list.append(p_text) - if len(part_list): - disk_string += "\n".join(part_list) + "\n" - height = len(disk_string.split("\n"))+16 - if height < 24: - height = 24 - self.u.resize(80, height) - self.u.head() - print(" ") - print(disk_string) - if not self.full: - print("S. Switch to Full Output") - else: - print("S. Switch to Slim Output") - lay = self.settings.get("full_layout", False) - l_str = "Slim" - if lay: - l_str = "Full" - print("L. Set As Default Layout (Current: {})".format(l_str)) - print("B. Mount the Boot Drive's EFI") - if clover: - print("C. Mount the Booted EFI (Clover/OC)") - print("") - - dd = self.settings.get("default_disk", None) - if dd == "clover": - dd = clover - elif dd == "boot": - dd = "/" - di = self.d.get_identifier(dd) - if di: - print("D. Pick Default Disk ({} - {})".format(self.d.get_volume_name(di), di)) - else: - print("D. Pick Default Disk (None Set)") - - am = self.settings.get("after_mount", None) - if not am: - am = "Return to Menu" - print("M. After Mounting: "+am) - print("Q. Quit") - print(" ") - print("(* denotes the booted EFI (Clover/OC))") - - menu = self.u.grab("Pick the drive containing your EFI: ") - if not len(menu): - if not di: - return self.get_efi() - return self.d.get_efi(di) - menu = menu.lower() - if menu == "q": - self.u.resize(80,24) - self.u.custom_quit() - elif menu == "s": - self.full ^= True - return self.get_efi() - elif menu == "b": - return self.d.get_efi("/") - elif menu == "c" and clover: - return self.d.get_efi(clover) - elif menu == "m": - self.after_mount() - return - elif menu == "d": - self.default_disk() - return - elif menu == "l": - self.settings["full_layout"] = self.full - self.flush_settings() - return - try: - disk_iden = int(menu) - if not (disk_iden > 0 and disk_iden <= len(mounts)): - # out of range! - self.u.grab("Invalid disk!", timeout=3) - return self.get_efi() - if type(mounts) is list: - # We have the small list - disk = mounts[disk_iden-1]["identifier"] - else: - # We have the dict - disk = mounts.keys()[disk_iden-1] - except: - disk = menu - iden = self.d.get_identifier(disk) - name = self.d.get_volume_name(disk) - if not iden: - self.u.grab("Invalid disk!", timeout=3) - return self.get_efi() - # Valid disk! - return self.d.get_efi(iden) - - def main(self): - while True: - efi = self.get_efi() - if not efi: - # Got nothing back - continue - # Mount the EFI partition - self.u.head("Mounting {}".format(efi)) - print(" ") - out = self.d.mount_partition(efi) - if out[2] == 0: - print(out[0]) - else: - print(out[1]) - # Check our settings - am = self.settings.get("after_mount", None) - if not am: - continue - if "reveal" in am.lower(): - # Reveal - mp = self.d.get_mount_point(efi) - if mp: - self.r.run({"args":["open", mp]}) - # Hang out for a couple seconds - if not self.settings.get("skip_countdown", False): - self.u.grab("", timeout=3) - if "quit" in am.lower(): - # Quit - self.u.resize(80,24) - self.u.custom_quit() - - def quiet_mount(self, disk_list, unmount=False): - ret = 0 - for disk in disk_list: - ident = self.d.get_identifier(disk) - if not ident: - continue - efi = self.d.get_efi(ident) - if not efi: - continue - if unmount: - out = self.d.unmount_partition(efi) - else: - out = self.d.mount_partition(efi) - if not out[2] == 0: - ret = out[2] - exit(ret) - -if __name__ == '__main__': - # Setup the cli args - parser = argparse.ArgumentParser(prog="MountEFI.command", description="MountEFI - an EFI Mounting Utility by CorpNewt") - parser.add_argument("-u", "--unmount", help="unmount instead of mount the passed EFIs", action="store_true") - parser.add_argument("-p", "--print-efi", help="prints the disk#s# of the EFI attached to the passed var") - parser.add_argument("disks",nargs="*") - - args = parser.parse_args() - - m = MountEFI(settings="./Scripts/settings.json") - # Gather defaults - unmount = False - if args.unmount: - unmount = True - if args.print_efi: - print("{}".format(m.d.get_efi(args.print_efi))) - # Check for args - if len(args.disks): - # We got command line args! - m.quiet_mount(args.disks, unmount) - elif not args.print_efi: - m.main() +main() diff --git a/README.md b/README.md index ba71c9d..57ab254 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ # MountEFI + An *even* more robust edition of my previous MountEFI scripts. Other scripts can call this script to do a silent mount - and receive a 0 on succes, or 1 (or higher) on failure. @@ -13,6 +14,10 @@ Do the following one line at a time in Terminal: git clone https://github.com/corpnewt/MountEFI cd MountEFI - chmod +x MountEFI.command - -Then run with either `./MountEFI.command` or by double-clicking *MountEFI.command* + pip install . + +Or: + + pip install MountEFI + +Then run with either `mount-efi` or `MountEFI`. diff --git a/Scripts/__init__.py b/Scripts/__init__.py deleted file mode 100644 index 962c1d3..0000000 --- a/Scripts/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from os.path import dirname, basename, isfile -import glob -modules = glob.glob(dirname(__file__)+"/*.py") -__all__ = [ basename(f)[:-3] for f in modules if isfile(f) and not f.endswith('__init__.py')] \ No newline at end of file diff --git a/Scripts/downloader.py b/Scripts/downloader.py deleted file mode 100755 index 579f9c9..0000000 --- a/Scripts/downloader.py +++ /dev/null @@ -1,130 +0,0 @@ -import sys, os, time, ssl, gzip -from io import BytesIO -# Python-aware urllib stuff -if sys.version_info >= (3, 0): - from urllib.request import urlopen, Request -else: - # Import urllib2 to catch errors - import urllib2 - from urllib2 import urlopen, Request - -class Downloader: - - def __init__(self,**kwargs): - self.ua = kwargs.get("useragent",{"User-Agent":"Mozilla"}) - self.chunk = 1048576 # 1024 x 1024 i.e. 1MiB - - # Provide reasonable default logic to workaround macOS CA file handling - cafile = ssl.get_default_verify_paths().openssl_cafile - try: - # If default OpenSSL CA file does not exist, use that from certifi - if not os.path.exists(cafile): - import certifi - cafile = certifi.where() - self.ssl_context = ssl.create_default_context(cafile=cafile) - except: - # None of the above worked, disable certificate verification for now - self.ssl_context = ssl._create_unverified_context() - return - - def _decode(self, value, encoding="utf-8", errors="ignore"): - # Helper method to only decode if bytes type - if sys.version_info >= (3,0) and isinstance(value, bytes): - return value.decode(encoding,errors) - return value - - def open_url(self, url, headers = None): - # Fall back on the default ua if none provided - headers = self.ua if headers == None else headers - # Wrap up the try/except block so we don't have to do this for each function - try: - response = urlopen(Request(url, headers=headers), context=self.ssl_context) - except Exception as e: - # No fixing this - bail - return None - return response - - def get_size(self, size, suffix=None, use_1024=False, round_to=2, strip_zeroes=False): - # size is the number of bytes - # suffix is the target suffix to locate (B, KB, MB, etc) - if found - # use_2014 denotes whether or not we display in MiB vs MB - # round_to is the number of dedimal points to round our result to (0-15) - # strip_zeroes denotes whether we strip out zeroes - - # Failsafe in case our size is unknown - if size == -1: - return "Unknown" - # Get our suffixes based on use_1024 - ext = ["B","KiB","MiB","GiB","TiB","PiB"] if use_1024 else ["B","KB","MB","GB","TB","PB"] - div = 1024 if use_1024 else 1000 - s = float(size) - s_dict = {} # Initialize our dict - # Iterate the ext list, and divide by 1000 or 1024 each time to setup the dict {ext:val} - for e in ext: - s_dict[e] = s - s /= div - # Get our suffix if provided - will be set to None if not found, or if started as None - suffix = next((x for x in ext if x.lower() == suffix.lower()),None) if suffix else suffix - # Get the largest value that's still over 1 - biggest = suffix if suffix else next((x for x in ext[::-1] if s_dict[x] >= 1), "B") - # Determine our rounding approach - first make sure it's an int; default to 2 on error - try:round_to=int(round_to) - except:round_to=2 - round_to = 0 if round_to < 0 else 15 if round_to > 15 else round_to # Ensure it's between 0 and 15 - bval = round(s_dict[biggest], round_to) - # Split our number based on decimal points - a,b = str(bval).split(".") - # Check if we need to strip or pad zeroes - b = b.rstrip("0") if strip_zeroes else b.ljust(round_to,"0") if round_to > 0 else "" - return "{:,}{} {}".format(int(a),"" if not b else "."+b,biggest) - - def _progress_hook(self, response, bytes_so_far, total_size): - if total_size > 0: - percent = float(bytes_so_far) / total_size - percent = round(percent*100, 2) - t_s = self.get_size(total_size) - try: b_s = self.get_size(bytes_so_far, t_s.split(" ")[1]) - except: b_s = self.get_size(bytes_so_far) - sys.stdout.write("Downloaded {} of {} ({:.2f}%)\r".format(b_s, t_s, percent)) - else: - b_s = self.get_size(bytes_so_far) - sys.stdout.write("Downloaded {}\r".format(b_s)) - - def get_string(self, url, progress = True, headers = None, expand_gzip = True): - response = self.get_bytes(url,progress,headers,expand_gzip) - if response == None: return None - return self._decode(response) - - def get_bytes(self, url, progress = True, headers = None, expand_gzip = True): - response = self.open_url(url, headers) - if response == None: return None - bytes_so_far = 0 - try: total_size = int(response.headers['Content-Length']) - except: total_size = -1 - chunk_so_far = b"" - while True: - chunk = response.read(self.chunk) - bytes_so_far += len(chunk) - if progress: self._progress_hook(response, bytes_so_far, total_size) - if not chunk: break - chunk_so_far += chunk - if expand_gzip and response.headers.get("Content-Encoding","unknown").lower() == "gzip": - fileobj = BytesIO(chunk_so_far) - gfile = gzip.GzipFile(fileobj=fileobj) - return gfile.read() - return chunk_so_far - - def stream_to_file(self, url, file_path, progress = True, headers = None): - response = self.open_url(url, headers) - if response == None: return None - bytes_so_far = 0 - try: total_size = int(response.headers['Content-Length']) - except: total_size = -1 - with open(file_path, 'wb') as f: - while True: - chunk = response.read(self.chunk) - bytes_so_far += len(chunk) - if progress: self._progress_hook(response, bytes_so_far, total_size) - if not chunk: break - f.write(chunk) - return file_path if os.path.exists(file_path) else None diff --git a/entry_points.ini b/entry_points.ini new file mode 100644 index 0000000..6583cf6 --- /dev/null +++ b/entry_points.ini @@ -0,0 +1,3 @@ +[console_scripts] +mount-efi = mount_efi.__main__:main +MountEFI = mount_efi.__main__:main diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d45ffa3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +appdirs>=1.4.4 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..9bcbf3d --- /dev/null +++ b/setup.py @@ -0,0 +1,7 @@ +from setuptools import setup + + +setup( + name="MountEFI", + setup_requires=["setupmeta"], +) diff --git a/src/mount_efi/__about__.py b/src/mount_efi/__about__.py new file mode 100644 index 0000000..1e642a2 --- /dev/null +++ b/src/mount_efi/__about__.py @@ -0,0 +1,5 @@ +__description__ = 'MountEFI' +__version__ = '0.0.0' +__versioning__ = 'post' +__author__ = 'corpnewt' +__url__ = 'https://github.com/corpnewt' diff --git a/src/mount_efi/__init__.py b/src/mount_efi/__init__.py new file mode 100644 index 0000000..50fbfbe --- /dev/null +++ b/src/mount_efi/__init__.py @@ -0,0 +1,6 @@ +from .bdmesg import get_bootloader_uuid +from .disk import Disk +from .reveal import Reveal +from .run import Run +from .utils import Utils + diff --git a/src/mount_efi/__main__.py b/src/mount_efi/__main__.py new file mode 100644 index 0000000..0274155 --- /dev/null +++ b/src/mount_efi/__main__.py @@ -0,0 +1,330 @@ +import argparse +import json +import os + +from appdirs import user_data_dir + +from mount_efi import Disk, Reveal, Run, Utils, get_bootloader_uuid +from mount_efi.__about__ import __author__, __description__ + + +USER_DATA_DIR = user_data_dir(__description__, __author__) +DEFAULT_SETTINGS_BASENAME = 'settings.json' +DEFAULT_SETTINGS_PATH = os.path.join(USER_DATA_DIR, DEFAULT_SETTINGS_BASENAME) + + +class MountEFI: + def __init__(self, **kwargs): + self.r = Run() + self.d = Disk() + self.u = Utils("MountEFI") + self.re = Reveal() + + self.settings_file = kwargs.get("settings", None) + cwd = os.getcwd() + os.chdir(os.path.dirname(os.path.realpath(__file__))) + if self.settings_file and os.path.exists(self.settings_file): + self.settings = json.load(open(self.settings_file)) + else: + self.settings = { + # Default settings here + "default_disk" : None, + "after_mount" : None, + "full_layout" : False, + "skip_countdown" : False, + } + os.chdir(cwd) + self.full = self.settings.get("full_layout", False) + + def flush_settings(self): + if self.settings_file: + settings_dir = os.path.dirname(self.settings_file) + if not os.path.exists(settings_dir): + os.makedirs(settings_dir, exist_ok=True) + json.dump(self.settings, open(self.settings_file, "w")) + + def after_mount(self): + self.u.resize(80, 24) + self.u.head("After Mount Action") + print(" ") + print("1. Return to Menu") + print("2. Quit") + print("3. Open EFI and Return to Menu") + print("4. Open EFI and Quit") + if not self.settings.get("skip_countdown", False): + print("5. Skip After-Mount Countdown") + else: + print("5. Use After-Mount Countdown") + print(" ") + print("M. Main Menu") + print("Q. Quit") + print(" ") + menu = self.u.grab("Please pick an option: ") + if not len(menu): + self.after_mount() + return + menu = menu.lower() + if menu in ["1","2","3","4"]: + self.settings["after_mount"] = [ + "Return to Menu", + "Quit", + "Reveal and Return to Menu", + "Reveal and Quit" + ][int(menu)-1] + self.flush_settings() + return + elif menu == "5": + cd = self.settings.get("skip_countdown", False) + cd ^= True + self.settings["skip_countdown"] = cd + self.flush_settings() + self.after_mount() + return + elif menu == "m": + return + elif menu == "q": + self.u.custom_quit() + self.after_mount() + + def default_disk(self): + self.d.update() + clover = get_bootloader_uuid() + print(clover) + self.u.resize(80, 24) + self.u.head("Select Default Disk") + print(" ") + print("1. None") + print("2. Boot Disk") + if clover: + print("3. Booted EFI (Clover/OC)") + print(" ") + print("M. Main Menu") + print("Q. Quit") + print(" ") + menu = self.u.grab("Please pick a default disk: ") + if not len(menu): + self.default_disk() + menu = menu.lower() + if menu in ["1","2"]: + self.settings["default_disk"] = [None, "boot"][int(menu)-1] + self.flush_settings() + return + elif menu == "3" and clover: + self.settings["default_disk"] = "clover" + self.flush_settings() + return + elif menu == "m": + return + elif menu == "q": + self.u.custom_quit() + self.default_disk() + + def get_efi(self): + self.d.update() + clover = get_bootloader_uuid() + i = 0 + disk_string = "" + if not self.full: + clover_disk = self.d.get_parent(clover) + mounts = self.d.get_mounted_volume_dicts() + for d in mounts: + i += 1 + disk_string += "{}. {} ({})".format(i, d["name"], d["identifier"]) + if self.d.get_parent(d["identifier"]) == clover_disk: + # if d["disk_uuid"] == clover: + disk_string += " *" + disk_string += "\n" + else: + mounts = self.d.get_disks_and_partitions_dict() + disks = mounts.keys() + for d in disks: + i += 1 + disk_string+= "{}. {}:\n".format(i, d) + parts = mounts[d]["partitions"] + part_list = [] + for p in parts: + p_text = " - {} ({})".format(p["name"], p["identifier"]) + if p["disk_uuid"] == clover: + # Got Clover + p_text += " *" + part_list.append(p_text) + if len(part_list): + disk_string += "\n".join(part_list) + "\n" + height = len(disk_string.split("\n"))+16 + if height < 24: + height = 24 + self.u.resize(80, height) + self.u.head() + print(" ") + print(disk_string) + if not self.full: + print("S. Switch to Full Output") + else: + print("S. Switch to Slim Output") + lay = self.settings.get("full_layout", False) + l_str = "Slim" + if lay: + l_str = "Full" + print("L. Set As Default Layout (Current: {})".format(l_str)) + print("B. Mount the Boot Drive's EFI") + if clover: + print("C. Mount the Booted EFI (Clover/OC)") + print("") + + dd = self.settings.get("default_disk", None) + if dd == "clover": + dd = clover + elif dd == "boot": + dd = "/" + di = self.d.get_identifier(dd) + if di: + print("D. Pick Default Disk ({} - {})".format(self.d.get_volume_name(di), di)) + else: + print("D. Pick Default Disk (None Set)") + + am = self.settings.get("after_mount", None) + if not am: + am = "Return to Menu" + print("M. After Mounting: "+am) + print("Q. Quit") + print(" ") + print("(* denotes the booted EFI (Clover/OC))") + + menu = self.u.grab("Pick the drive containing your EFI: ") + if not len(menu): + if not di: + return self.get_efi() + return self.d.get_efi(di) + menu = menu.lower() + if menu == "q": + self.u.resize(80,24) + self.u.custom_quit() + elif menu == "s": + self.full ^= True + return self.get_efi() + elif menu == "b": + return self.d.get_efi("/") + elif menu == "c" and clover: + return self.d.get_efi(clover) + elif menu == "m": + self.after_mount() + return + elif menu == "d": + self.default_disk() + return + elif menu == "l": + self.settings["full_layout"] = self.full + self.flush_settings() + return + try: + disk_iden = int(menu) + if not (disk_iden > 0 and disk_iden <= len(mounts)): + # out of range! + self.u.grab("Invalid disk!", timeout=3) + return self.get_efi() + if type(mounts) is list: + # We have the small list + disk = mounts[disk_iden-1]["identifier"] + else: + # We have the dict + disk = mounts.keys()[disk_iden-1] + except: + disk = menu + iden = self.d.get_identifier(disk) + name = self.d.get_volume_name(disk) + if not iden: + self.u.grab("Invalid disk!", timeout=3) + return self.get_efi() + # Valid disk! + return self.d.get_efi(iden) + + def main(self): + while True: + efi = self.get_efi() + if not efi: + # Got nothing back + continue + # Mount the EFI partition + self.u.head("Mounting {}".format(efi)) + print(" ") + out = self.d.mount_partition(efi) + if out[2] == 0: + print(out[0]) + else: + print(out[1]) + # Check our settings + am = self.settings.get("after_mount", None) + if not am: + continue + if "reveal" in am.lower(): + # Reveal + mp = self.d.get_mount_point(efi) + if mp: + self.r.run({"args":["open", mp]}) + # Hang out for a couple seconds + if not self.settings.get("skip_countdown", False): + self.u.grab("", timeout=3) + if "quit" in am.lower(): + # Quit + self.u.resize(80,24) + self.u.custom_quit() + + def quiet_mount(self, disk_list, unmount=False): + ret = 0 + for disk in disk_list: + ident = self.d.get_identifier(disk) + if not ident: + continue + efi = self.d.get_efi(ident) + if not efi: + continue + if unmount: + out = self.d.unmount_partition(efi) + else: + out = self.d.mount_partition(efi) + if not out[2] == 0: + ret = out[2] + exit(ret) + + +def parse_args(): + parser = argparse.ArgumentParser( + description="MountEFI - an EFI Mounting Utility by CorpNewt" + ) + parser.add_argument( + "-u", "--unmount", action="store_true", + help="unmount instead of mount the passed EFIs" + ) + parser.add_argument( + "-p", "--print-efi", + help="prints the disk#s# of the EFI attached to the passed var" + ) + parser.add_argument( + "-s", "--settings", default=DEFAULT_SETTINGS_PATH, + help="use a JSON settings file (default: '%(default)s')" + ) + parser.add_argument( + "disks", nargs="*", metavar="DISK" + ) + + return parser.parse_args() + + +def main(): + args = parse_args() + + m = MountEFI(settings=args.settings) + + # Gather defaults + unmount = False + if args.unmount: + unmount = True + if args.print_efi: + print("{}".format(m.d.get_efi(args.print_efi))) + + # Check for args + if len(args.disks): + # We got command line args! + m.quiet_mount(args.disks, unmount) + elif not args.print_efi: + m.main() diff --git a/Scripts/bdmesg.py b/src/mount_efi/bdmesg.py similarity index 94% rename from Scripts/bdmesg.py rename to src/mount_efi/bdmesg.py index bd7edbc..f776887 100755 --- a/Scripts/bdmesg.py +++ b/src/mount_efi/bdmesg.py @@ -1,11 +1,14 @@ -import binascii, subprocess, sys +import binascii +import subprocess +import sys + def get_clover_uuid(): bd = bdmesg() if not len(bd): return "" # Get bdmesg output - then parse for SelfDevicePath - if not "SelfDevicePath=" in bd: + if "SelfDevicePath=" not in bd: # Not found return "" try: @@ -54,7 +57,7 @@ def _bdmesg(comm): bd = _decode(bd) for line in bd.split("\n"): # We're just looking for the "boot-log" property, then we need to format it - if not '"boot-log"' in line: + if '"boot-log"' not in line: # Skip it! continue # Must have found it - let's try to split it, then get the hex data and process it diff --git a/Scripts/disk.py b/src/mount_efi/disk.py similarity index 97% rename from Scripts/disk.py rename to src/mount_efi/disk.py index e384ec0..1b02c64 100755 --- a/Scripts/disk.py +++ b/src/mount_efi/disk.py @@ -1,15 +1,19 @@ -import subprocess, plistlib, sys, os, time, json -sys.path.append(os.path.abspath(os.path.dirname(os.path.realpath(__file__)))) -import run +import os +import plistlib +import sys + +from .run import Run + if sys.version_info < (3,0): # Force use of StringIO instead of cStringIO as the latter # has issues with Unicode strings from StringIO import StringIO + class Disk: def __init__(self): - self.r = run.Run() + self.r = Run() self.diskutil = self.get_diskutil() self.os_version = ".".join( self.r.run({"args":["sw_vers", "-productVersion"]})[0].split(".")[:2] @@ -73,24 +77,24 @@ def _compare_versions(self, vers1, vers2, pad = -1): # vers1 > vers2 = False # # Must be separated with a period - + # Sanitize the pads pad = -1 if not type(pad) is int else pad - + # Cast as strings vers1 = str(vers1) vers2 = str(vers2) - + # Split to lists v1_parts = vers1.split(".") v2_parts = vers2.split(".") - + # Equalize lengths if len(v1_parts) < len(v2_parts): v1_parts.extend([str(pad) for x in range(len(v2_parts) - len(v1_parts))]) elif len(v2_parts) < len(v1_parts): v2_parts.extend([str(pad) for x in range(len(v1_parts) - len(v2_parts))]) - + # Iterate and compare for i in range(len(v1_parts)): # Remove non-numeric @@ -169,7 +173,7 @@ def is_apfs(self, disk): return None # Takes a disk identifier, and returns whether or not it's apfs for d in self.disks.get("AllDisksAndPartitions", []): - if not "APFSVolumes" in d: + if "APFSVolumes" not in d: continue if d.get("DeviceIdentifier", "").lower() == disk_id.lower(): return True @@ -182,7 +186,7 @@ def is_apfs_container(self, disk): disk_id = self.get_identifier(disk) if not disk_id: return None - # Takes a disk identifier, and returns whether or not that specific + # Takes a disk identifier, and returns whether or not that specific # disk/volume is an APFS Container for d in self.disks.get("AllDisksAndPartitions", []): # Only check partitions @@ -195,7 +199,7 @@ def is_cs_container(self, disk): disk_id = self.get_identifier(disk) if not disk_id: return None - # Takes a disk identifier, and returns whether or not that specific + # Takes a disk identifier, and returns whether or not that specific # disk/volume is an CoreStorage Container for d in self.disks.get("AllDisksAndPartitions", []): # Only check partitions @@ -240,7 +244,7 @@ def get_top_identifier(self, disk): if not disk_id: return None return disk_id.replace("disk", "didk").split("s")[0].replace("didk", "disk") - + def _get_physical_disk(self, disk, search_term): # Change disk0s1 to disk0 our_disk = self.get_top_identifier(disk) @@ -356,7 +360,7 @@ def is_mounted(self, disk): if not disk_id: return None m = self.get_mount_point(disk_id) - return (m != None and len(m)) + return (m is not None and len(m)) def get_volumes(self): # Returns a list object with all volumes from disks @@ -421,7 +425,7 @@ def get_mounted_volume_dicts(self): vol_list = [] for v in self.get_mounted_volumes(): i = self.get_identifier(os.path.join("/Volumes", v)) - if i == None: + if i is None: i = self.get_identifier("/") if not self.get_volume_name(i) == v: # Not valid and not our boot drive @@ -437,12 +441,12 @@ def get_mounted_volume_dicts(self): def get_disks_and_partitions_dict(self): # Returns a list of dictionaries like so: - # { "disk0" : { "partitions" : [ - # { - # "identifier" : "disk0s1", - # "name" : "EFI", + # { "disk0" : { "partitions" : [ + # { + # "identifier" : "disk0s1", + # "name" : "EFI", # "mount_point" : "/Volumes/EFI" - # } + # } # ] } } disks = {} for d in self.disks.get("AllDisks", []): @@ -457,7 +461,7 @@ def get_disks_and_partitions_dict(self): continue if self.is_cs_container(d): continue - if not parent in disks: + if parent not in disks: disks[parent] = { "partitions" : [] } disks[parent]["partitions"].append({ "name" : self.get_volume_name(d), diff --git a/Scripts/rebuildcache.py b/src/mount_efi/rebuildcache.py similarity index 81% rename from Scripts/rebuildcache.py rename to src/mount_efi/rebuildcache.py index 009eb84..425069d 100644 --- a/Scripts/rebuildcache.py +++ b/src/mount_efi/rebuildcache.py @@ -1,10 +1,9 @@ -import os, sys -sys.path.append(os.path.abspath(os.path.dirname(os.path.realpath(__file__)))) -import run, utils +from .run import Run + class Rebuild: def __init__(self): - self.r = run.Run() + self.r = Run() return def _compare_versions(self, vers1, vers2): @@ -31,9 +30,9 @@ def _compare_versions(self, vers1, vers2): def rebuild(self, stream = True): # Get os version os_vers = self.r.run({"args":["sw_vers", "-productVersion"]})[0] - if self._compare_versions(os_vers, "10.11.0") == True: + if self._compare_versions(os_vers, "10.11.0") is True: # We're on an OS version prior to 10.11 return self.r.run({"args":"sudo touch /System/Library/Extensions && sudo kextcache -u /", "stream" : stream, "shell" : True}) else: # 10.11 or above - return self.r.run({"args":"sudo kextcache -i / && sudo kextcache -u /", "stream" : stream, "shell" : True}) \ No newline at end of file + return self.r.run({"args":"sudo kextcache -i / && sudo kextcache -u /", "stream" : stream, "shell" : True}) diff --git a/Scripts/reveal.py b/src/mount_efi/reveal.py similarity index 93% rename from Scripts/reveal.py rename to src/mount_efi/reveal.py index f0c7516..7f28ad2 100755 --- a/Scripts/reveal.py +++ b/src/mount_efi/reveal.py @@ -1,13 +1,14 @@ -import sys, os -sys.path.append(os.path.abspath(os.path.dirname(os.path.realpath(__file__)))) -import run +import sys +import os +from .run import Run + class Reveal: def __init__(self): - self.r = run.Run() + self.r = Run() return - + def get_parent(self, path): return os.path.normpath(os.path.join(path, os.pardir)) @@ -67,4 +68,4 @@ def notify(self, title = None, subtitle = None, sound = None): if sound: n_text += " sound name \"{}\"".format(sound.replace("\"", "\\\"")) command = ["osascript", "-e", n_text] - return self.r.run({"args" : command}) \ No newline at end of file + return self.r.run({"args" : command}) diff --git a/Scripts/run.py b/src/mount_efi/run.py similarity index 94% rename from Scripts/run.py rename to src/mount_efi/run.py index b586adc..78a255e 100755 --- a/Scripts/run.py +++ b/src/mount_efi/run.py @@ -1,151 +1,157 @@ -import sys, subprocess, time, threading, shlex -try: - from Queue import Queue, Empty -except: - from queue import Queue, Empty - -ON_POSIX = 'posix' in sys.builtin_module_names - -class Run: - - def __init__(self): - return - - def _read_output(self, pipe, q): - try: - for line in iter(lambda: pipe.read(1), b''): - q.put(line) - except ValueError: - pass - pipe.close() - - def _create_thread(self, output): - # Creates a new queue and thread object to watch based on the output pipe sent - q = Queue() - t = threading.Thread(target=self._read_output, args=(output, q)) - t.daemon = True - return (q,t) - - def _stream_output(self, comm, shell = False): - output = error = "" - p = None - try: - if shell and type(comm) is list: - comm = " ".join(shlex.quote(x) for x in comm) - if not shell and type(comm) is str: - comm = shlex.split(comm) - p = subprocess.Popen(comm, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, universal_newlines=True, close_fds=ON_POSIX) - # Setup the stdout thread/queue - q,t = self._create_thread(p.stdout) - qe,te = self._create_thread(p.stderr) - # Start both threads - t.start() - te.start() - - while True: - c = z = "" - try: c = q.get_nowait() - except Empty: pass - else: - sys.stdout.write(c) - output += c - sys.stdout.flush() - try: z = qe.get_nowait() - except Empty: pass - else: - sys.stderr.write(z) - error += z - sys.stderr.flush() - if not c==z=="": continue # Keep going until empty - # No output - see if still running - p.poll() - if p.returncode != None: - # Subprocess ended - break - # No output, but subprocess still running - stall for 20ms - time.sleep(0.02) - - o, e = p.communicate() - return (output+o, error+e, p.returncode) - except: - if p: - try: o, e = p.communicate() - except: o = e = "" - return (output+o, error+e, p.returncode) - return ("", "Command not found!", 1) - - def _decode(self, value, encoding="utf-8", errors="ignore"): - # Helper method to only decode if bytes type - if sys.version_info >= (3,0) and isinstance(value, bytes): - return value.decode(encoding,errors) - return value - - def _run_command(self, comm, shell = False): - c = None - try: - if shell and type(comm) is list: - comm = " ".join(shlex.quote(x) for x in comm) - if not shell and type(comm) is str: - comm = shlex.split(comm) - p = subprocess.Popen(comm, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - c = p.communicate() - except: - if c == None: - return ("", "Command not found!", 1) - return (self._decode(c[0]), self._decode(c[1]), p.returncode) - - def run(self, command_list, leave_on_fail = False): - # Command list should be an array of dicts - if type(command_list) is dict: - # We only have one command - command_list = [command_list] - output_list = [] - for comm in command_list: - args = comm.get("args", []) - shell = comm.get("shell", False) - stream = comm.get("stream", False) - sudo = comm.get("sudo", False) - stdout = comm.get("stdout", False) - stderr = comm.get("stderr", False) - mess = comm.get("message", None) - show = comm.get("show", False) - - if not mess == None: - print(mess) - - if not len(args): - # nothing to process - continue - if sudo: - # Check if we have sudo - out = self._run_command(["which", "sudo"]) - if "sudo" in out[0]: - # Can sudo - if type(args) is list: - args.insert(0, out[0].replace("\n", "")) # add to start of list - elif type(args) is str: - args = out[0].replace("\n", "") + " " + args # add to start of string - - if show: - print(" ".join(args)) - - if stream: - # Stream it! - out = self._stream_output(args, shell) - else: - # Just run and gather output - out = self._run_command(args, shell) - if stdout and len(out[0]): - print(out[0]) - if stderr and len(out[1]): - print(out[1]) - # Append output - output_list.append(out) - # Check for errors - if leave_on_fail and out[2] != 0: - # Got an error - leave - break - if len(output_list) == 1: - # We only ran one command - just return that output - return output_list[0] - return output_list +import sys +import subprocess +import time +import threading +import shlex + +try: + from Queue import Queue, Empty +except: + from queue import Queue, Empty + +ON_POSIX = 'posix' in sys.builtin_module_names + + +class Run: + + def __init__(self): + return + + def _read_output(self, pipe, q): + try: + for line in iter(lambda: pipe.read(1), b''): + q.put(line) + except ValueError: + pass + pipe.close() + + def _create_thread(self, output): + # Creates a new queue and thread object to watch based on the output pipe sent + q = Queue() + t = threading.Thread(target=self._read_output, args=(output, q)) + t.daemon = True + return (q,t) + + def _stream_output(self, comm, shell = False): + output = error = "" + p = None + try: + if shell and type(comm) is list: + comm = " ".join(shlex.quote(x) for x in comm) + if not shell and type(comm) is str: + comm = shlex.split(comm) + p = subprocess.Popen(comm, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, universal_newlines=True, close_fds=ON_POSIX) + # Setup the stdout thread/queue + q,t = self._create_thread(p.stdout) + qe,te = self._create_thread(p.stderr) + # Start both threads + t.start() + te.start() + + while True: + c = z = "" + try: c = q.get_nowait() + except Empty: pass + else: + sys.stdout.write(c) + output += c + sys.stdout.flush() + try: z = qe.get_nowait() + except Empty: pass + else: + sys.stderr.write(z) + error += z + sys.stderr.flush() + if not c==z=="": continue # Keep going until empty + # No output - see if still running + p.poll() + if p.returncode is not None: + # Subprocess ended + break + # No output, but subprocess still running - stall for 20ms + time.sleep(0.02) + + o, e = p.communicate() + return (output+o, error+e, p.returncode) + except: + if p: + try: o, e = p.communicate() + except: o = e = "" + return (output+o, error+e, p.returncode) + return ("", "Command not found!", 1) + + def _decode(self, value, encoding="utf-8", errors="ignore"): + # Helper method to only decode if bytes type + if sys.version_info >= (3,0) and isinstance(value, bytes): + return value.decode(encoding,errors) + return value + + def _run_command(self, comm, shell = False): + c = None + try: + if shell and type(comm) is list: + comm = " ".join(shlex.quote(x) for x in comm) + if not shell and type(comm) is str: + comm = shlex.split(comm) + p = subprocess.Popen(comm, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + c = p.communicate() + except: + if c is None: + return ("", "Command not found!", 1) + return (self._decode(c[0]), self._decode(c[1]), p.returncode) + + def run(self, command_list, leave_on_fail = False): + # Command list should be an array of dicts + if type(command_list) is dict: + # We only have one command + command_list = [command_list] + output_list = [] + for comm in command_list: + args = comm.get("args", []) + shell = comm.get("shell", False) + stream = comm.get("stream", False) + sudo = comm.get("sudo", False) + stdout = comm.get("stdout", False) + stderr = comm.get("stderr", False) + mess = comm.get("message", None) + show = comm.get("show", False) + + if mess is not None: + print(mess) + + if not len(args): + # nothing to process + continue + if sudo: + # Check if we have sudo + out = self._run_command(["which", "sudo"]) + if "sudo" in out[0]: + # Can sudo + if type(args) is list: + args.insert(0, out[0].replace("\n", "")) # add to start of list + elif type(args) is str: + args = out[0].replace("\n", "") + " " + args # add to start of string + + if show: + print(" ".join(args)) + + if stream: + # Stream it! + out = self._stream_output(args, shell) + else: + # Just run and gather output + out = self._run_command(args, shell) + if stdout and len(out[0]): + print(out[0]) + if stderr and len(out[1]): + print(out[1]) + # Append output + output_list.append(out) + # Check for errors + if leave_on_fail and out[2] != 0: + # Got an error - leave + break + if len(output_list) == 1: + # We only ran one command - just return that output + return output_list[0] + return output_list diff --git a/Scripts/utils.py b/src/mount_efi/utils.py similarity index 97% rename from Scripts/utils.py rename to src/mount_efi/utils.py index 9ee4b30..da97f7d 100755 --- a/Scripts/utils.py +++ b/src/mount_efi/utils.py @@ -1,4 +1,11 @@ -import sys, os, time, re, json, datetime, ctypes, subprocess +import ctypes +import datetime +import json +import os +import subprocess +import sys +import time +from builtins import input if os.name == "nt": # Windows @@ -7,6 +14,7 @@ # Not Windows \o/ import select + class Utils: def __init__(self, name = "Python Script"): @@ -41,31 +49,31 @@ def elevate(self, file): os.execv(c, [ sys.executable, 'python'] + sys.argv) except: exit(1) - + def compare_versions(self, vers1, vers2, **kwargs): # Helper method to compare ##.## strings # # vers1 < vers2 = True # vers1 = vers2 = None # vers1 > vers2 = False - + # Sanitize the pads pad = str(kwargs.get("pad", "")) sep = str(kwargs.get("separator", ".")) ignore_case = kwargs.get("ignore_case", True) - + # Cast as strings vers1 = str(vers1) vers2 = str(vers2) - + if ignore_case: vers1 = vers1.lower() vers2 = vers2.lower() # Split and pad lists v1_parts, v2_parts = self.pad_length(vers1.split(sep), vers2.split(sep)) - + # Iterate and compare for i in range(len(v1_parts)): # Remove non-numeric @@ -98,7 +106,7 @@ def pad_length(self, var1, var2, pad = "0"): else: var2 = "{}{}".format((pad*(len(var1)-len(var2))), var2) return (var1, var2) - + def check_path(self, path): # Let's loop until we either get a working path, or no changes test_path = path @@ -143,7 +151,7 @@ def grab(self, prompt, **kwargs): if sys.version_info >= (3, 0): return input(prompt) else: - return str(raw_input(prompt)) + return str(input(prompt)) # Write our prompt sys.stdout.write(prompt) sys.stdout.flush() @@ -170,7 +178,7 @@ def grab(self, prompt, **kwargs): return default def cls(self): - os.system('cls' if os.name=='nt' else 'clear') + os.system('cls' if os.name=='nt' else 'clear') def cprint(self, message, **kwargs): strip_colors = kwargs.get("strip_colors", False)