UNPKG

22.2 kBPlain TextView Raw
1# -*- coding: utf-8 -*-
2from __future__ import unicode_literals
3from __future__ import division
4
5import struct
6import datetime
7import io
8import re
9import os
10import os.path
11import stat
12import sys
13
14if sys.platform == 'darwin':
15 from . import osx
16
17try:
18 long
19except NameError:
20 long = int
21
22from .utils import *
23
24ALIAS_KIND_FILE = 0
25ALIAS_KIND_FOLDER = 1
26
27ALIAS_HFS_VOLUME_SIGNATURE = b'H+'
28
29ALIAS_FIXED_DISK = 0
30ALIAS_NETWORK_DISK = 1
31ALIAS_400KB_FLOPPY_DISK = 2
32ALIAS_800KB_FLOPPY_DISK = 3
33ALIAS_1_44MB_FLOPPY_DISK = 4
34ALIAS_EJECTABLE_DISK = 5
35
36ALIAS_NO_CNID = 0xffffffff
37
38def encode_utf8(s):
39 if isinstance(s, bytes):
40 return s
41 return s.encode('utf-8')
42
43def decode_utf8(s):
44 if isinstance(s, bytes):
45 return s.decode('utf-8')
46 return s
47
48class AppleShareInfo (object):
49 def __init__(self, zone=None, server=None, user=None):
50 #: The AppleShare zone
51 self.zone = zone
52 #: The AFP server
53 self.server = server
54 #: The username
55 self.user = user
56
57 def __repr__(self):
58 return 'AppleShareInfo(%r,%r,%r)' % (self.zone, self.server, self.user)
59
60class VolumeInfo (object):
61 def __init__(self, name, creation_date, fs_type, disk_type,
62 attribute_flags, fs_id, appleshare_info=None,
63 driver_name=None, posix_path=None, disk_image_alias=None,
64 dialup_info=None, network_mount_info=None):
65 #: The name of the volume on which the target resides
66 self.name = name
67
68 #: The creation date of the target's volume
69 self.creation_date = creation_date
70
71 #: The filesystem type (a two character code, e.g. ``b'H+'`` for HFS+)
72 self.fs_type = fs_type
73
74 #: The type of disk; should be one of
75 #:
76 #: * ALIAS_FIXED_DISK
77 #: * ALIAS_NETWORK_DISK
78 #: * ALIAS_400KB_FLOPPY_DISK
79 #: * ALIAS_800KB_FLOPPY_DISK
80 #: * ALIAS_1_44MB_FLOPPY_DISK
81 #: * ALIAS_EJECTABLE_DISK
82 self.disk_type = disk_type
83
84 #: Filesystem attribute flags (from HFS volume header)
85 self.attribute_flags = attribute_flags
86
87 #: Filesystem identifier
88 self.fs_id = fs_id
89
90 #: AppleShare information (for automatic remounting of network shares)
91 #: *(optional)*
92 self.appleshare_info = appleshare_info
93
94 #: Driver name (*probably* contains a disk driver name on older Macs)
95 #: *(optional)*
96 self.driver_name = driver_name
97
98 #: POSIX path of the mount point of the target's volume
99 #: *(optional)*
100 self.posix_path = posix_path
101
102 #: :class:`Alias` object pointing at the disk image on which the
103 #: target's volume resides *(optional)*
104 self.disk_image_alias = disk_image_alias
105
106 #: Dialup information (for automatic establishment of dialup connections)
107 self.dialup_info = dialup_info
108
109 #: Network mount information (for automatic remounting)
110 self.network_mount_info = network_mount_info
111
112 def __repr__(self):
113 args = ['name', 'creation_date', 'fs_type', 'disk_type',
114 'attribute_flags', 'fs_id']
115 values = []
116 for a in args:
117 v = getattr(self, a)
118 values.append(repr(v))
119
120 kwargs = ['appleshare_info', 'driver_name', 'posix_path',
121 'disk_image_alias', 'dialup_info', 'network_mount_info']
122 for a in kwargs:
123 v = getattr(self, a)
124 if v is not None:
125 values.append('%s=%r' % (a, v))
126 return 'VolumeInfo(%s)' % ','.join(values)
127
128class TargetInfo (object):
129 def __init__(self, kind, filename, folder_cnid, cnid, creation_date,
130 creator_code, type_code, levels_from=-1, levels_to=-1,
131 folder_name=None, cnid_path=None, carbon_path=None,
132 posix_path=None, user_home_prefix_len=None):
133 #: Either ALIAS_KIND_FILE or ALIAS_KIND_FOLDER
134 self.kind = kind
135
136 #: The filename of the target
137 self.filename = filename
138
139 #: The CNID (Catalog Node ID) of the target's containing folder;
140 #: CNIDs are similar to but different than traditional UNIX inode
141 #: numbers
142 self.folder_cnid = folder_cnid
143
144 #: The CNID (Catalog Node ID) of the target
145 self.cnid = cnid
146
147 #: The target's *creation* date.
148 self.creation_date = creation_date
149
150 #: The target's Mac creator code (a four-character binary string)
151 self.creator_code = creator_code
152
153 #: The target's Mac type code (a four-character binary string)
154 self.type_code = type_code
155
156 #: The depth of the alias? Always seems to be -1 on OS X.
157 self.levels_from = levels_from
158
159 #: The depth of the target? Always seems to be -1 on OS X.
160 self.levels_to = levels_to
161
162 #: The (POSIX) name of the target's containing folder. *(optional)*
163 self.folder_name = folder_name
164
165 #: The path from the volume root as a sequence of CNIDs. *(optional)*
166 self.cnid_path = cnid_path
167
168 #: The Carbon path of the target *(optional)*
169 self.carbon_path = carbon_path
170
171 #: The POSIX path of the target relative to the volume root. Note
172 #: that this may or may not have a leading '/' character, but it is
173 #: always relative to the containing volume. *(optional)*
174 self.posix_path = posix_path
175
176 #: If the path points into a user's home folder, the number of folders
177 #: deep that we go before we get to that home folder. *(optional)*
178 self.user_home_prefix_len = user_home_prefix_len
179
180 def __repr__(self):
181 args = ['kind', 'filename', 'folder_cnid', 'cnid', 'creation_date',
182 'creator_code', 'type_code']
183 values = []
184 for a in args:
185 v = getattr(self, a)
186 values.append(repr(v))
187
188 if self.levels_from != -1:
189 values.append('levels_from=%r' % self.levels_from)
190 if self.levels_to != -1:
191 values.append('levels_to=%r' % self.levels_to)
192
193 kwargs = ['folder_name', 'cnid_path', 'carbon_path',
194 'posix_path', 'user_home_prefix_len']
195 for a in kwargs:
196 v = getattr(self, a)
197 values.append('%s=%r' % (a, v))
198
199 return 'TargetInfo(%s)' % ','.join(values)
200
201TAG_CARBON_FOLDER_NAME = 0
202TAG_CNID_PATH = 1
203TAG_CARBON_PATH = 2
204TAG_APPLESHARE_ZONE = 3
205TAG_APPLESHARE_SERVER_NAME = 4
206TAG_APPLESHARE_USERNAME = 5
207TAG_DRIVER_NAME = 6
208TAG_NETWORK_MOUNT_INFO = 9
209TAG_DIALUP_INFO = 10
210TAG_UNICODE_FILENAME = 14
211TAG_UNICODE_VOLUME_NAME = 15
212TAG_HIGH_RES_VOLUME_CREATION_DATE = 16
213TAG_HIGH_RES_CREATION_DATE = 17
214TAG_POSIX_PATH = 18
215TAG_POSIX_PATH_TO_MOUNTPOINT = 19
216TAG_RECURSIVE_ALIAS_OF_DISK_IMAGE = 20
217TAG_USER_HOME_LENGTH_PREFIX = 21
218
219class Alias (object):
220 def __init__(self, appinfo=b'\0\0\0\0', version=2, volume=None,
221 target=None, extra=[]):
222 """Construct a new :class:`Alias` object with the specified
223 contents."""
224
225 #: Application specific information (four byte byte-string)
226 self.appinfo = appinfo
227
228 #: Version (we support only version 2)
229 self.version = version
230
231 #: A :class:`VolumeInfo` object describing the target's volume
232 self.volume = volume
233
234 #: A :class:`TargetInfo` object describing the target
235 self.target = target
236
237 #: A list of extra `(tag, value)` pairs
238 self.extra = list(extra)
239
240 @classmethod
241 def _from_fd(cls, b):
242 appinfo, recsize, version = struct.unpack(b'>4shh', b.read(8))
243
244 if recsize < 150:
245 raise ValueError('Incorrect alias length')
246
247 if version != 2:
248 raise ValueError('Unsupported alias version %u' % version)
249
250 kind, volname, voldate, fstype, disktype, \
251 folder_cnid, filename, cnid, crdate, creator_code, type_code, \
252 levels_from, levels_to, volattrs, volfsid, reserved = \
253 struct.unpack(b'>h28pI2shI64pII4s4shhI2s10s', b.read(142))
254
255 voldate = mac_epoch + datetime.timedelta(seconds=voldate)
256 crdate = mac_epoch + datetime.timedelta(seconds=crdate)
257
258 alias = Alias()
259 alias.appinfo = appinfo
260
261 alias.volume = VolumeInfo (volname.replace('/',':'),
262 voldate, fstype, disktype,
263 volattrs, volfsid)
264 alias.target = TargetInfo (kind, filename.replace('/',':'),
265 folder_cnid, cnid,
266 crdate, creator_code, type_code)
267 alias.target.levels_from = levels_from
268 alias.target.levels_to = levels_to
269
270 tag = struct.unpack(b'>h', b.read(2))[0]
271
272 while tag != -1:
273 length = struct.unpack(b'>h', b.read(2))[0]
274 value = b.read(length)
275 if length & 1:
276 b.read(1)
277
278 if tag == TAG_CARBON_FOLDER_NAME:
279 alias.target.folder_name = value.replace('/',':')
280 elif tag == TAG_CNID_PATH:
281 alias.target.cnid_path = struct.unpack(b'>%uI' % (length // 4),
282 value)
283 elif tag == TAG_CARBON_PATH:
284 alias.target.carbon_path = value
285 elif tag == TAG_APPLESHARE_ZONE:
286 if alias.volume.appleshare_info is None:
287 alias.volume.appleshare_info = AppleShareInfo()
288 alias.volume.appleshare_info.zone = value
289 elif tag == TAG_APPLESHARE_SERVER_NAME:
290 if alias.volume.appleshare_info is None:
291 alias.volume.appleshare_info = AppleShareInfo()
292 alias.volume.appleshare_info.server = value
293 elif tag == TAG_APPLESHARE_USERNAME:
294 if alias.volume.appleshare_info is None:
295 alias.volume.appleshare_info = AppleShareInfo()
296 alias.volume.appleshare_info.user = value
297 elif tag == TAG_DRIVER_NAME:
298 alias.volume.driver_name = value
299 elif tag == TAG_NETWORK_MOUNT_INFO:
300 alias.volume.network_mount_info = value
301 elif tag == TAG_DIALUP_INFO:
302 alias.volume.dialup_info = value
303 elif tag == TAG_UNICODE_FILENAME:
304 alias.target.filename = value[2:].decode('utf-16be')
305 elif tag == TAG_UNICODE_VOLUME_NAME:
306 alias.volume.name = value[2:].decode('utf-16be')
307 elif tag == TAG_HIGH_RES_VOLUME_CREATION_DATE:
308 seconds = struct.unpack(b'>Q', value)[0] / 65536.0
309 alias.volume.creation_date \
310 = mac_epoch + datetime.timedelta(seconds=seconds)
311 elif tag == TAG_HIGH_RES_CREATION_DATE:
312 seconds = struct.unpack(b'>Q', value)[0] / 65536.0
313 alias.target.creation_date \
314 = mac_epoch + datetime.timedelta(seconds=seconds)
315 elif tag == TAG_POSIX_PATH:
316 alias.target.posix_path = value
317 elif tag == TAG_POSIX_PATH_TO_MOUNTPOINT:
318 alias.volume.posix_path = value
319 elif tag == TAG_RECURSIVE_ALIAS_OF_DISK_IMAGE:
320 alias.volume.disk_image_alias = Alias.from_bytes(value)
321 elif tag == TAG_USER_HOME_LENGTH_PREFIX:
322 alias.target.user_home_prefix_len = struct.unpack(b'>h', value)[0]
323 else:
324 alias.extra.append((tag, value))
325
326 tag = struct.unpack(b'>h', b.read(2))[0]
327
328 return alias
329
330 @classmethod
331 def from_bytes(cls, bytes):
332 """Construct an :class:`Alias` object given binary Alias data."""
333 with io.BytesIO(bytes) as b:
334 return cls._from_fd(b)
335
336 @classmethod
337 def for_file(cls, path):
338 """Create an :class:`Alias` that points at the specified file."""
339 if sys.platform != 'darwin':
340 raise Exception('Not implemented (requires special support)')
341
342 path = encode_utf8(path)
343
344 a = Alias()
345
346 # Find the filesystem
347 st = osx.statfs(path)
348 vol_path = st.f_mntonname
349
350 # Grab its attributes
351 attrs = [osx.ATTR_CMN_CRTIME,
352 osx.ATTR_VOL_NAME,
353 0, 0, 0]
354 volinfo = osx.getattrlist(vol_path, attrs, 0)
355
356 vol_crtime = volinfo[0]
357 vol_name = encode_utf8(volinfo[1])
358
359 # Also grab various attributes of the file
360 attrs = [(osx.ATTR_CMN_OBJTYPE
361 | osx.ATTR_CMN_CRTIME
362 | osx.ATTR_CMN_FNDRINFO
363 | osx.ATTR_CMN_FILEID
364 | osx.ATTR_CMN_PARENTID), 0, 0, 0, 0]
365 info = osx.getattrlist(path, attrs, osx.FSOPT_NOFOLLOW)
366
367 if info[0] == osx.VDIR:
368 kind = ALIAS_KIND_FOLDER
369 else:
370 kind = ALIAS_KIND_FILE
371
372 cnid = info[3]
373 folder_cnid = info[4]
374
375 dirname, filename = os.path.split(path)
376
377 if dirname == b'' or dirname == b'.':
378 dirname = os.getcwd()
379
380 foldername = os.path.basename(dirname)
381
382 creation_date = info[1]
383
384 if kind == ALIAS_KIND_FILE:
385 creator_code = struct.pack(b'I', info[2].fileInfo.fileCreator)
386 type_code = struct.pack(b'I', info[2].fileInfo.fileType)
387 else:
388 creator_code = b'\0\0\0\0'
389 type_code = b'\0\0\0\0'
390
391 a.target = TargetInfo(kind, filename, folder_cnid, cnid, creation_date,
392 creator_code, type_code)
393 a.volume = VolumeInfo(vol_name, vol_crtime, b'H+',
394 ALIAS_FIXED_DISK, 0, b'\0\0')
395
396 a.target.folder_name = foldername
397 a.volume.posix_path = vol_path
398
399 rel_path = os.path.relpath(path, vol_path)
400
401 # Leave off the initial '/' if vol_path is '/' (no idea why)
402 if vol_path == b'/':
403 a.target.posix_path = rel_path
404 else:
405 a.target.posix_path = b'/' + rel_path
406
407 # Construct the Carbon and CNID paths
408 carbon_path = []
409 cnid_path = []
410 head, tail = os.path.split(rel_path)
411 if not tail:
412 head, tail = os.path.split(head)
413 while head or tail:
414 if head:
415 attrs = [osx.ATTR_CMN_FILEID, 0, 0, 0, 0]
416 info = osx.getattrlist(os.path.join(vol_path, head), attrs, 0)
417 cnid_path.append(info[0])
418 carbon_tail = tail.replace(b':',b'/')
419 carbon_path.insert(0, carbon_tail)
420 head, tail = os.path.split(head)
421
422 carbon_path = vol_name + b':' + b':\0'.join(carbon_path)
423
424 a.target.carbon_path = carbon_path
425 a.target.cnid_path = cnid_path
426
427 return a
428
429 def _to_fd(self, b):
430 # We'll come back and fix the length when we're done
431 pos = b.tell()
432 b.write(struct.pack(b'>4shh', self.appinfo, 0, self.version))
433
434 carbon_volname = encode_utf8(self.volume.name).replace(b':',b'/')
435 carbon_filename = encode_utf8(self.target.filename).replace(b':',b'/')
436 voldate = (self.volume.creation_date - mac_epoch).total_seconds()
437 crdate = (self.target.creation_date - mac_epoch).total_seconds()
438
439 # NOTE: crdate should be in local time, but that's system dependent
440 # (so doing so is ridiculous, and nothing could rely on it).
441 b.write(struct.pack(b'>h28pI2shI64pII4s4shhI2s10s',
442 self.target.kind,
443 carbon_volname, int(voldate),
444 self.volume.fs_type,
445 self.volume.disk_type,
446 self.target.folder_cnid,
447 carbon_filename,
448 self.target.cnid,
449 int(crdate),
450 self.target.creator_code,
451 self.target.type_code,
452 self.target.levels_from,
453 self.target.levels_to,
454 self.volume.attribute_flags,
455 self.volume.fs_id,
456 b'\0'*10))
457
458 # Excuse the odd order; we're copying Finder
459 if self.target.folder_name:
460 carbon_foldername = encode_utf8(self.target.folder_name)\
461 .replace(b':',b'/')
462 b.write(struct.pack(b'>hh', TAG_CARBON_FOLDER_NAME,
463 len(carbon_foldername)))
464 b.write(carbon_foldername)
465 if len(carbon_foldername) & 1:
466 b.write(b'\0')
467
468 b.write(struct.pack(b'>hhQhhQ',
469 TAG_HIGH_RES_VOLUME_CREATION_DATE,
470 8, long(voldate * 65536),
471 TAG_HIGH_RES_CREATION_DATE,
472 8, long(crdate * 65536)))
473
474 if self.target.cnid_path:
475 cnid_path = struct.pack(b'>%uI' % len(self.target.cnid_path),
476 *self.target.cnid_path)
477 b.write(struct.pack(b'>hh', TAG_CNID_PATH,
478 len(cnid_path)))
479 b.write(cnid_path)
480
481 if self.target.carbon_path:
482 carbon_path=encode_utf8(self.target.carbon_path)
483 b.write(struct.pack(b'>hh', TAG_CARBON_PATH,
484 len(carbon_path)))
485 b.write(carbon_path)
486 if len(carbon_path) & 1:
487 b.write(b'\0')
488
489 if self.volume.appleshare_info:
490 ai = self.volume.appleshare_info
491 if ai.zone:
492 b.write(struct.pack(b'>hh', TAG_APPLESHARE_ZONE,
493 len(ai.zone)))
494 b.write(ai.zone)
495 if len(ai.zone) & 1:
496 b.write(b'\0')
497 if ai.server:
498 b.write(struct.pack(b'>hh', TAG_APPLESHARE_SERVER_NAME,
499 len(ai.server)))
500 b.write(ai.server)
501 if len(ai.server) & 1:
502 b.write(b'\0')
503 if ai.username:
504 b.write(struct.pack(b'>hh', TAG_APPLESHARE_USERNAME,
505 len(ai.username)))
506 b.write(ai.username)
507 if len(ai.username) & 1:
508 b.write(b'\0')
509
510 if self.volume.driver_name:
511 driver_name = encode_utf8(self.volume.driver_name)
512 b.write(struct.pack(b'>hh', TAG_DRIVER_NAME,
513 len(driver_name)))
514 b.write(driver_name)
515 if len(driver_name) & 1:
516 b.write(b'\0')
517
518 if self.volume.network_mount_info:
519 b.write(struct.pack(b'>hh', TAG_NETWORK_MOUNT_INFO,
520 len(self.volume.network_mount_info)))
521 b.write(self.volume.network_mount_info)
522 if len(self.volume.network_mount_info) & 1:
523 b.write(b'\0')
524
525 if self.volume.dialup_info:
526 b.write(struct.pack(b'>hh', TAG_DIALUP_INFO,
527 len(self.volume.network_mount_info)))
528 b.write(self.volume.network_mount_info)
529 if len(self.volume.network_mount_info) & 1:
530 b.write(b'\0')
531
532 utf16 = decode_utf8(self.target.filename)\
533 .replace(':','/').encode('utf-16-be')
534 b.write(struct.pack(b'>hhh', TAG_UNICODE_FILENAME,
535 len(utf16) + 2,
536 len(utf16) // 2))
537 b.write(utf16)
538
539 utf16 = decode_utf8(self.volume.name)\
540 .replace(':','/').encode('utf-16-be')
541 b.write(struct.pack(b'>hhh', TAG_UNICODE_VOLUME_NAME,
542 len(utf16) + 2,
543 len(utf16) // 2))
544 b.write(utf16)
545
546 if self.target.posix_path:
547 posix_path = encode_utf8(self.target.posix_path)
548 b.write(struct.pack(b'>hh', TAG_POSIX_PATH,
549 len(posix_path)))
550 b.write(posix_path)
551 if len(posix_path) & 1:
552 b.write(b'\0')
553
554 if self.volume.posix_path:
555 posix_path = encode_utf8(self.volume.posix_path)
556 b.write(struct.pack(b'>hh', TAG_POSIX_PATH_TO_MOUNTPOINT,
557 len(posix_path)))
558 b.write(posix_path)
559 if len(posix_path) & 1:
560 b.write(b'\0')
561
562 if self.volume.disk_image_alias:
563 d = self.volume.disk_image_alias.to_bytes()
564 b.write(struct.pack(b'>hh', TAG_RECURSIVE_ALIAS_OF_DISK_IMAGE,
565 len(d)))
566 b.write(d)
567 if len(d) & 1:
568 b.write(b'\0')
569
570 if self.target.user_home_prefix_len is not None:
571 b.write(struct.pack(b'>hhh', TAG_USER_HOME_LENGTH_PREFIX,
572 2, self.target.user_home_prefix_len))
573
574 for t,v in self.extra:
575 b.write(struct.pack(b'>hh', t, len(v)))
576 b.write(v)
577 if len(v) & 1:
578 b.write(b'\0')
579
580 b.write(struct.pack(b'>hh', -1, 0))
581
582 blen = b.tell() - pos
583 b.seek(pos + 4, os.SEEK_SET)
584 b.write(struct.pack(b'>h', blen))
585
586 def to_bytes(self):
587 """Returns the binary representation for this :class:`Alias`."""
588 with io.BytesIO() as b:
589 self._to_fd(b)
590 return b.getvalue()
591
592 def __str__(self):
593 return '<Alias target=%s>' % self.target.filename
594
595 def __repr__(self):
596 values = []
597 if self.appinfo != b'\0\0\0\0':
598 values.append('appinfo=%r' % self.appinfo)
599 if self.version != 2:
600 values.append('version=%r' % self.version)
601 if self.volume is not None:
602 values.append('volume=%r' % self.volume)
603 if self.target is not None:
604 values.append('target=%r' % self.target)
605 if self.extra:
606 values.append('extra=%r' % self.extra)
607 return 'Alias(%s)' % ','.join(values)