|
| 1 | +#!/usr/bin/env python3 |
| 2 | +# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai |
| 3 | +# Name: udisks.py |
| 4 | +# Purpose: Module to mount unmount and eject using dbus and udisk |
| 5 | +# Authors: Original author is Kovid Goyal <kovid@kovidgoyal.net> and python3 |
| 6 | +# supporte by Sundar for multibootusb project |
| 7 | +# Licence: 'GPL v3' as per original Licence |
| 8 | + |
| 9 | +__license__ = 'GPL v3' |
| 10 | +__copyright__ = '2010, Kovid Goyal <kovid@kovidgoyal.net>' |
| 11 | +__docformat__ = 'restructuredtext en' |
| 12 | + |
| 13 | +# from __future__ import print_function |
| 14 | +import os, re |
| 15 | + |
| 16 | + |
| 17 | +def node_mountpoint(node): |
| 18 | + |
| 19 | + def de_mangle(raw): |
| 20 | + return raw.replace('\\040', ' ').replace('\\011', '\t').replace('\\012', |
| 21 | + '\n').replace('\\0134', '\\') |
| 22 | + |
| 23 | + for line in open('/proc/mounts').readlines(): |
| 24 | + line = line.split() |
| 25 | + if line[0] == node: |
| 26 | + return de_mangle(line[1]) |
| 27 | + return None |
| 28 | + |
| 29 | + |
| 30 | +class NoUDisks1(Exception): |
| 31 | + pass |
| 32 | + |
| 33 | + |
| 34 | +class UDisks(object): |
| 35 | + |
| 36 | + def __init__(self): |
| 37 | + import dbus |
| 38 | + self.bus = dbus.SystemBus() |
| 39 | + try: |
| 40 | + self.main = dbus.Interface(self.bus.get_object('org.freedesktop.UDisks', |
| 41 | + '/org/freedesktop/UDisks'), 'org.freedesktop.UDisks') |
| 42 | + except dbus.exceptions.DBusException as e: |
| 43 | + if getattr(e, '_dbus_error_name', None) == 'org.freedesktop.DBus.Error.ServiceUnknown': |
| 44 | + raise NoUDisks1() |
| 45 | + raise |
| 46 | + |
| 47 | + def device(self, device_node_path): |
| 48 | + import dbus |
| 49 | + devpath = self.main.FindDeviceByDeviceFile(device_node_path) |
| 50 | + return dbus.Interface(self.bus.get_object('org.freedesktop.UDisks', |
| 51 | + devpath), 'org.freedesktop.UDisks.Device') |
| 52 | + |
| 53 | + def mount(self, device_node_path): |
| 54 | + d = self.device(device_node_path) |
| 55 | + try: |
| 56 | + return str(d.FilesystemMount('', |
| 57 | + ['auth_no_user_interaction', 'rw', 'noexec', 'nosuid', |
| 58 | + 'nodev', 'uid=%d'%os.geteuid(), 'gid=%d'%os.getegid()])) |
| 59 | + except: |
| 60 | + # May be already mounted, check |
| 61 | + mp = node_mountpoint(str(device_node_path)) |
| 62 | + if mp is None: |
| 63 | + raise |
| 64 | + return mp |
| 65 | + |
| 66 | + def unmount(self, device_node_path): |
| 67 | + d = self.device(device_node_path) |
| 68 | + d.FilesystemUnmount(['force']) |
| 69 | + |
| 70 | + def eject(self, device_node_path): |
| 71 | + parent = device_node_path |
| 72 | + while parent[-1] in '0123456789': |
| 73 | + parent = parent[:-1] |
| 74 | + d = self.device(parent) |
| 75 | + d.DriveEject([]) |
| 76 | + |
| 77 | + |
| 78 | +class NoUDisks2(Exception): |
| 79 | + pass |
| 80 | + |
| 81 | + |
| 82 | +class UDisks2(object): |
| 83 | + |
| 84 | + BLOCK = 'org.freedesktop.UDisks2.Block' |
| 85 | + FILESYSTEM = 'org.freedesktop.UDisks2.Filesystem' |
| 86 | + DRIVE = 'org.freedesktop.UDisks2.Drive' |
| 87 | + |
| 88 | + def __init__(self): |
| 89 | + import dbus |
| 90 | + self.bus = dbus.SystemBus() |
| 91 | + try: |
| 92 | + self.bus.get_object('org.freedesktop.UDisks2', |
| 93 | + '/org/freedesktop/UDisks2') |
| 94 | + except dbus.exceptions.DBusException as e: |
| 95 | + if getattr(e, '_dbus_error_name', None) == 'org.freedesktop.DBus.Error.ServiceUnknown': |
| 96 | + raise NoUDisks2() |
| 97 | + raise |
| 98 | + |
| 99 | + def device(self, device_node_path): |
| 100 | + device_node_path = os.path.realpath(device_node_path) |
| 101 | + devname = device_node_path.split('/')[-1] |
| 102 | + |
| 103 | + # First we try a direct object path |
| 104 | + bd = self.bus.get_object('org.freedesktop.UDisks2', |
| 105 | + '/org/freedesktop/UDisks2/block_devices/%s'%devname) |
| 106 | + try: |
| 107 | + device = bd.Get(self.BLOCK, 'Device', |
| 108 | + dbus_interface='org.freedesktop.DBus.Properties') |
| 109 | + device = bytearray(device).replace(b'\x00', b'').decode('utf-8') |
| 110 | + except: |
| 111 | + device = None |
| 112 | + |
| 113 | + if device == device_node_path: |
| 114 | + return bd |
| 115 | + |
| 116 | + # Enumerate all devices known to UDisks |
| 117 | + devs = self.bus.get_object('org.freedesktop.UDisks2', |
| 118 | + '/org/freedesktop/UDisks2/block_devices') |
| 119 | + xml = devs.Introspect(dbus_interface='org.freedesktop.DBus.Introspectable') |
| 120 | + for dev in re.finditer(r'name=[\'"](.+?)[\'"]', type('')(xml)): |
| 121 | + bd = self.bus.get_object('org.freedesktop.UDisks2', |
| 122 | + '/org/freedesktop/UDisks2/block_devices/%s2'%dev.group(1)) |
| 123 | + try: |
| 124 | + device = bd.Get(self.BLOCK, 'Device', |
| 125 | + dbus_interface='org.freedesktop.DBus.Properties') |
| 126 | + device = bytearray(device).replace(b'\x00', b'').decode('utf-8') |
| 127 | + except: |
| 128 | + device = None |
| 129 | + if device == device_node_path: |
| 130 | + return bd |
| 131 | + |
| 132 | + raise ValueError('%r not known to UDisks2'%device_node_path) |
| 133 | + |
| 134 | + def mount(self, device_node_path): |
| 135 | + d = self.device(device_node_path) |
| 136 | + mount_options = ['rw', 'noexec', 'nosuid', |
| 137 | + 'nodev', 'uid=%d'%os.geteuid(), 'gid=%d'%os.getegid()] |
| 138 | + try: |
| 139 | + return str(d.Mount( |
| 140 | + { |
| 141 | + 'auth.no_user_interaction':True, |
| 142 | + 'options':','.join(mount_options) |
| 143 | + }, |
| 144 | + dbus_interface=self.FILESYSTEM)) |
| 145 | + except: |
| 146 | + # May be already mounted, check |
| 147 | + mp = node_mountpoint(str(device_node_path)) |
| 148 | + if mp is None: |
| 149 | + raise |
| 150 | + return mp |
| 151 | + |
| 152 | + def unmount(self, device_node_path): |
| 153 | + d = self.device(device_node_path) |
| 154 | + d.Unmount({'force':True, 'auth.no_user_interaction':True}, |
| 155 | + dbus_interface=self.FILESYSTEM) |
| 156 | + |
| 157 | + def drive_for_device(self, device): |
| 158 | + drive = device.Get(self.BLOCK, 'Drive', |
| 159 | + dbus_interface='org.freedesktop.DBus.Properties') |
| 160 | + return self.bus.get_object('org.freedesktop.UDisks2', drive) |
| 161 | + |
| 162 | + def eject(self, device_node_path): |
| 163 | + drive = self.drive_for_device(self.device(device_node_path)) |
| 164 | + drive.Eject({'auth.no_user_interaction':True}, |
| 165 | + dbus_interface=self.DRIVE) |
| 166 | + |
| 167 | + |
| 168 | +def get_udisks(ver=None): |
| 169 | + if ver is None: |
| 170 | + try: |
| 171 | + u = UDisks2() |
| 172 | + except NoUDisks2: |
| 173 | + u = UDisks() |
| 174 | + return u |
| 175 | + return UDisks2() if ver == 2 else UDisks() |
| 176 | + |
| 177 | + |
| 178 | +def get_udisks1(): |
| 179 | + u = None |
| 180 | + try: |
| 181 | + u = UDisks() |
| 182 | + except NoUDisks1: |
| 183 | + try: |
| 184 | + u = UDisks2() |
| 185 | + except NoUDisks2: |
| 186 | + pass |
| 187 | + if u is None: |
| 188 | + raise EnvironmentError('UDisks not available on your system') |
| 189 | + return u |
| 190 | + |
| 191 | + |
| 192 | +def mount(node_path): |
| 193 | + u = get_udisks1() |
| 194 | + u.mount(node_path) |
| 195 | + |
| 196 | + |
| 197 | +def eject(node_path): |
| 198 | + u = get_udisks1() |
| 199 | + u.eject(node_path) |
| 200 | + |
| 201 | + |
| 202 | +def umount(node_path): |
| 203 | + u = get_udisks1() |
| 204 | + u.unmount(node_path) |
| 205 | + |
| 206 | + |
| 207 | +def test_udisks(ver=None): |
| 208 | + import sys |
| 209 | + dev = sys.argv[1] |
| 210 | + print('Testing with node', dev) |
| 211 | + u = get_udisks(ver=ver) |
| 212 | + print('Using Udisks:', u.__class__.__name__) |
| 213 | + print('Mounted at:', u.mount(dev)) |
| 214 | + print('Unmounting') |
| 215 | + u.unmount(dev) |
| 216 | + print('Mounting') |
| 217 | + u.mount(dev) |
| 218 | + print('Ejecting:') |
| 219 | + u.eject(dev) |
| 220 | + |
| 221 | +if __name__ == '__main__': |
| 222 | + print('Run test here...') |
| 223 | + # test_udisks() |
0 commit comments