"""MH interface -- purely object-oriented (well, almost)
mh = mhlib.MH() # use default mailbox directory and profile
mh = mhlib.MH(mailbox) # override mailbox location (default from profile)
mh = mhlib.MH(mailbox, profile) # override mailbox and profile
mh.error(format, ...) # print error message -- can be overridden
s = mh.getprofile(key) # profile entry (None if not set)
path = mh.getpath() # mailbox pathname
name = mh.getcontext() # name of current folder
mh.setcontext(name) # set name of current folder
list = mh.listfolders() # names of top-level folders
list = mh.listallfolders() # names of all folders, including subfolders
list = mh.listsubfolders(name) # direct subfolders of given folder
list = mh.listallsubfolders(name) # all subfolders of given folder
mh.makefolder(name) # create new folder
mh.deletefolder(name) # delete folder -- must have no subfolders
f = mh.openfolder(name) # new open folder object
f.error(format, ...) # same as mh.error(format, ...)
path = f.getfullname() # folder's full pathname
path = f.getsequencesfilename() # full pathname of folder's sequences file
path = f.getmessagefilename(n) # full pathname of message n in folder
list = f.listmessages() # list of messages in folder (as numbers)
n = f.getcurrent() # get current message
f.setcurrent(n) # set current message
list = f.parsesequence(seq) # parse msgs syntax into list of messages
n = f.getlast() # get last message (0 if no messagse)
f.setlast(n) # set last message (internal use only)
dict = f.getsequences() # dictionary of sequences in folder {name: list}
f.putsequences(dict) # write sequences back to folder
f.createmessage(n, fp) # add message from file f as number n
f.removemessages(list) # remove messages in list from folder
f.refilemessages(list, tofolder) # move messages in list to other folder
f.movemessage(n, tofolder, ton) # move one message to a given destination
f.copymessage(n, tofolder, ton) # copy one message to a given destination
m = f.openmessage(n) # new open message object (costs a file descriptor)
m is a derived class of mimetools.Message(rfc822.Message), with:
s = m.getheadertext() # text of message's headers
s = m.getheadertext(pred) # text of message's headers, filtered by pred
s = m.getbodytext() # text of message's body, decoded
s = m.getbodytext(0) # text of message's body, not decoded
# XXX To do, functionality:
# XXX To do, organization:
# - move IntSet to separate file
# - move most Message functionality to module mimetools
MH_PROFILE
= '~/.mh_profile'
MH_SEQUENCES
= '.mh_sequences'
from bisect
import bisect
__all__
= ["MH","Error","Folder","Message"]
"""Class representing a particular collection of folders.
Optional constructor arguments are the pathname for the directory
containing the collection, and the MH profile to use.
If either is omitted or empty a default is used; the default
directory is taken from the MH profile if it is specified there."""
def __init__(self
, path
= None, profile
= None):
if profile
is None: profile
= MH_PROFILE
self
.profile
= os
.path
.expanduser(profile
)
if path
is None: path
= self
.getprofile('Path')
if not os
.path
.isabs(path
) and path
[0] != '~':
path
= os
.path
.join('~', path
)
path
= os
.path
.expanduser(path
)
if not os
.path
.isdir(path
): raise Error
, 'MH() path not found'
"""String representation."""
return 'MH(%r, %r)' % (self
.path
, self
.profile
)
def error(self
, msg
, *args
):
"""Routine to print an error. May be overridden by a derived class."""
sys
.stderr
.write('MH error: %s\n' % (msg
% args
))
def getprofile(self
, key
):
"""Return a profile entry, None if not found."""
return pickline(self
.profile
, key
)
"""Return the path (the name of the collection's directory)."""
"""Return the name of the current folder."""
context
= pickline(os
.path
.join(self
.getpath(), 'context'),
if not context
: context
= 'inbox'
def setcontext(self
, context
):
"""Set the name of the current folder."""
fn
= os
.path
.join(self
.getpath(), 'context')
f
.write("Current-Folder: %s\n" % context
)
"""Return the names of the top-level folders."""
for name
in os
.listdir(path
):
fullname
= os
.path
.join(path
, name
)
if os
.path
.isdir(fullname
):
def listsubfolders(self
, name
):
"""Return the names of the subfolders in a given folder
(prefixed with the given folder name)."""
fullname
= os
.path
.join(self
.path
, name
)
# Get the link count so we can avoid listing folders
# that have no subfolders.
nlinks
= os
.stat(fullname
).st_nlink
subnames
= os
.listdir(fullname
)
fullsubname
= os
.path
.join(fullname
, subname
)
if os
.path
.isdir(fullsubname
):
name_subname
= os
.path
.join(name
, subname
)
subfolders
.append(name_subname
)
# Stop looking for subfolders when
def listallfolders(self
):
"""Return the names of all folders and subfolders, recursively."""
return self
.listallsubfolders('')
def listallsubfolders(self
, name
):
"""Return the names of subfolders in a given folder, recursively."""
fullname
= os
.path
.join(self
.path
, name
)
# Get the link count so we can avoid listing folders
# that have no subfolders.
nlinks
= os
.stat(fullname
).st_nlink
subnames
= os
.listdir(fullname
)
if subname
[0] == ',' or isnumeric(subname
): continue
fullsubname
= os
.path
.join(fullname
, subname
)
if os
.path
.isdir(fullsubname
):
name_subname
= os
.path
.join(name
, subname
)
subfolders
.append(name_subname
)
if not os
.path
.islink(fullsubname
):
subsubfolders
= self
.listallsubfolders(
subfolders
= subfolders
+ subsubfolders
# Stop looking for subfolders when
def openfolder(self
, name
):
"""Return a new Folder object for the named folder."""
return Folder(self
, name
)
def makefolder(self
, name
):
"""Create a new folder (or raise os.error if it cannot be created)."""
protect
= pickline(self
.profile
, 'Folder-Protect')
if protect
and isnumeric(protect
):
os
.mkdir(os
.path
.join(self
.getpath(), name
), mode
)
def deletefolder(self
, name
):
"""Delete a folder. This removes files in the folder but not
subdirectories. Raise os.error if deleting the folder itself fails."""
fullname
= os
.path
.join(self
.getpath(), name
)
for subname
in os
.listdir(fullname
):
fullsubname
= os
.path
.join(fullname
, subname
)
self
.error('%s not deleted, continuing...' %
numericprog
= re
.compile('^[1-9][0-9]*$')
return numericprog
.match(str) is not None
"""Class representing a particular folder."""
def __init__(self
, mh
, name
):
if not os
.path
.isdir(self
.getfullname()):
raise Error
, 'no folder %s' % name
"""String representation."""
return 'Folder(%r, %r)' % (self
.mh
, self
.name
)
"""Error message handler."""
"""Return the full pathname of the folder."""
return os
.path
.join(self
.mh
.path
, self
.name
)
def getsequencesfilename(self
):
"""Return the full pathname of the folder's sequences file."""
return os
.path
.join(self
.getfullname(), MH_SEQUENCES
)
def getmessagefilename(self
, n
):
"""Return the full pathname of a message in the folder."""
return os
.path
.join(self
.getfullname(), str(n
))
def listsubfolders(self
):
"""Return list of direct subfolders."""
return self
.mh
.listsubfolders(self
.name
)
def listallsubfolders(self
):
"""Return list of all subfolders."""
return self
.mh
.listallsubfolders(self
.name
)
"""Return the list of messages currently present in the folder.
As a side effect, set self.last to the last message (or 0)."""
match
= numericprog
.match
for name
in os
.listdir(self
.getfullname()):
messages
= map(int, messages
)
"""Return the set of sequences for the folder."""
fullname
= self
.getsequencesfilename()
self
.error('bad sequence in %s: %s' %
(fullname
, line
.strip()))
value
= IntSet(fields
[1].strip(), ' ').tolist()
def putsequences(self
, sequences
):
"""Write the set of sequences back to the folder."""
fullname
= self
.getsequencesfilename()
for key
, seq
in sequences
.iteritems():
if not f
: f
= open(fullname
, 'w')
f
.write('%s: %s\n' % (key
, s
.tostring()))
"""Return the current message. Raise Error when there is none."""
seqs
= self
.getsequences()
except (ValueError, KeyError):
raise Error
, "no cur message"
"""Set the current message."""
updateline(self
.getsequencesfilename(), 'cur', str(n
), 0)
def parsesequence(self
, seq
):
"""Parse an MH sequence specification into a message list.
Attempt to mimic mh-sequence(5) as close as possible.
Also attempt to mimic observed behavior regarding which
conditions cause which error messages."""
# XXX Still not complete (see mh-format(5)).
# - 'prev', 'next' as count
# - Sequence-Negation option
all
= self
.listmessages()
# Observed behavior: test for empty folder is done first
raise Error
, "no messages in %s" % self
.name
# Common case first: all is frequently the default
# Test for X:Y before X-Y because 'seq:-n' matches both
head
, dir, tail
= seq
[:i
], '', seq
[i
+1:]
dir, tail
= tail
[:1], tail
[1:]
raise Error
, "bad message list %s" % seq
except (ValueError, OverflowError):
# Can't use sys.maxint because of i+count below
anchor
= self
._parseindex
(head
, all
)
seqs
= self
.getsequences()
msg
= "bad message list %s" % seq
raise Error
, msg
, sys
.exc_info()[2]
raise Error
, "sequence %s empty" % head
if head
in ('prev', 'last'):
return all
[max(0, i
-count
):i
]
i
= bisect(all
, anchor
-1)
begin
= self
._parseindex
(seq
[:i
], all
)
end
= self
._parseindex
(seq
[i
+1:], all
)
raise Error
, "bad message list %s" % seq
# Neither X:Y nor X-Y; must be a number or a (pseudo-)sequence
n
= self
._parseindex
(seq
, all
)
seqs
= self
.getsequences()
msg
= "bad message list %s" % seq
raise Error
, "message %d doesn't exist" % n
raise Error
, "no %s message" % seq
def _parseindex(self
, seq
, all
):
"""Internal: parse a message number (or cur, first, etc.)."""
except (OverflowError, ValueError):
raise Error
, "no next message"
raise Error
, "no prev message"
raise Error
, "no prev message"
def openmessage(self
, n
):
"""Open a message -- returns a Message object."""
def removemessages(self
, list):
"""Remove one or more messages -- may raise os.error."""
path
= self
.getmessagefilename(n
)
commapath
= self
.getmessagefilename(',' + str(n
))
os
.rename(path
, commapath
)
self
.removefromallsequences(deleted
)
raise os
.error
, errors
[0]
raise os
.error
, ('multiple errors:', errors
)
def refilemessages(self
, list, tofolder
, keepsequences
=0):
"""Refile one or more messages -- may raise os.error.
'tofolder' is an open folder object."""
ton
= tofolder
.getlast() + 1
path
= self
.getmessagefilename(n
)
topath
= tofolder
.getmessagefilename(ton
)
shutil
.copy2(path
, topath
)
except (IOError, os
.error
), msg
:
tofolder
._copysequences
(self
, refiled
.items())
self
.removefromallsequences(refiled
.keys())
raise os
.error
, errors
[0]
raise os
.error
, ('multiple errors:', errors
)
def _copysequences(self
, fromfolder
, refileditems
):
"""Helper for refilemessages() to copy sequences."""
fromsequences
= fromfolder
.getsequences()
tosequences
= self
.getsequences()
for name
, seq
in fromsequences
.items():
toseq
= tosequences
[name
]
for fromn
, ton
in refileditems
:
tosequences
[name
] = toseq
self
.putsequences(tosequences
)
def movemessage(self
, n
, tofolder
, ton
):
"""Move one message over a specific destination message,
which may or may not already exist."""
path
= self
.getmessagefilename(n
)
# Open it to check that it exists
topath
= tofolder
.getmessagefilename(ton
)
backuptopath
= tofolder
.getmessagefilename(',%d' % ton
)
os
.rename(topath
, backuptopath
)
shutil
.copy2(path
, topath
)
self
.removefromallsequences([n
])
def copymessage(self
, n
, tofolder
, ton
):
"""Copy one message over a specific destination message,
which may or may not already exist."""
path
= self
.getmessagefilename(n
)
# Open it to check that it exists
topath
= tofolder
.getmessagefilename(ton
)
backuptopath
= tofolder
.getmessagefilename(',%d' % ton
)
os
.rename(topath
, backuptopath
)
shutil
.copy2(path
, topath
)
def createmessage(self
, n
, txt
):
"""Create a message, with text from the open file txt."""
path
= self
.getmessagefilename(n
)
backuppath
= self
.getmessagefilename(',%d' % n
)
os
.rename(path
, backuppath
)
def removefromallsequences(self
, list):
"""Remove one or more messages from all sequences (including last)
-- but not from 'cur'!!!"""
if hasattr(self
, 'last') and self
.last
in list:
sequences
= self
.getsequences()
for name
, seq
in sequences
.items():
self
.putsequences(sequences
)
"""Return the last message number."""
if not hasattr(self
, 'last'):
self
.listmessages() # Set self.last
"""Set the last message number."""
if hasattr(self
, 'last'):
class Message(mimetools
.Message
):
def __init__(self
, f
, n
, fp
= None):
path
= f
.getmessagefilename(n
)
mimetools
.Message
.__init
__(self
, fp
)
"""String representation."""
return 'Message(%s, %s)' % (repr(self
.folder
), self
.number
)
def getheadertext(self
, pred
= None):
"""Return the message's header text as a string. If an
argument is specified, it is used as a filter predicate to
decide which headers to return (its argument is the header
name converted to lower case)."""
return ''.join(self
.headers
)
for line
in self
.headers
:
if not line
[0].isspace():
hit
= pred(line
[:i
].lower())
if hit
: headers
.append(line
)
def getbodytext(self
, decode
= 1):
"""Return the message's body text as string. This undoes a
Content-Transfer-Encoding, but does not interpret other MIME
features (e.g. multipart messages). To suppress decoding,
pass 0 as an argument."""
self
.fp
.seek(self
.startofbody
)
encoding
= self
.getencoding()
if not decode
or encoding
in ('', '7bit', '8bit', 'binary'):
from StringIO
import StringIO
mimetools
.decode(self
.fp
, output
, encoding
)
"""Only for multipart messages: return the message's body as a
list of SubMessage objects. Each submessage object behaves
(almost) as a Message object."""
if self
.getmaintype() != 'multipart':
raise Error
, 'Content-Type is not multipart/*'
bdry
= self
.getparam('boundary')
raise Error
, 'multipart/* without boundary param'
self
.fp
.seek(self
.startofbody
)
mf
= multifile
.MultiFile(self
.fp
)
n
= "%s.%r" % (self
.number
, 1 + len(parts
))
part
= SubMessage(self
.folder
, n
, mf
)
"""Return body, either a string or a list of messages."""
if self
.getmaintype() == 'multipart':
return self
.getbodyparts()
return self
.getbodytext()
class SubMessage(Message
):
def __init__(self
, f
, n
, fp
):
Message
.__init
__(self
, f
, n
, fp
)
if self
.getmaintype() == 'multipart':
self
.body
= Message
.getbodyparts(self
)
self
.body
= Message
.getbodytext(self
)
self
.bodyencoded
= Message
.getbodytext(self
, decode
=0)
# XXX If this is big, should remember file pointers
"""String representation."""
f
, n
, fp
= self
.folder
, self
.number
, self
.fp
return 'SubMessage(%s, %s, %s)' % (f
, n
, fp
)
def getbodytext(self
, decode
= 1):
if type(self
.body
) == type(''):
if type(self
.body
) == type([]):
"""Class implementing sets of integers.
This is an efficient representation for sets consisting of several
continuous ranges, e.g. 1-100,200-400,402-1000 is represented
internally as a list of three pairs: [(1,100), (200,400),
(402,1000)]. The internal representation is always kept normalized.
The constructor has up to three arguments:
- the string used to initialize the set (default ''),
- the separator between ranges (default ',')
- the separator between begin and end of a range (default '-')
The separators must be strings (not regexprs) and should be different.
The tostring() function yields a string that can be passed to another
IntSet constructor; __repr__() is a valid IntSet constructor itself.
# XXX The default begin/end separator means that negative numbers are
# not supported very well.
# XXX There are currently no operations to remove set elements.
def __init__(self
, data
= None, sep
= ',', rng
= '-'):
if data
: self
.fromstring(data
)
def __cmp__(self
, other
):
return cmp(self
.pairs
, other
.pairs
)
return 'IntSet(%r, %r, %r)' % (self
.tostring(), self
.sep
, self
.rng
)
while i
< len(self
.pairs
):
alo
, ahi
= self
.pairs
[i
-1]
self
.pairs
[i
-1:i
+1] = [(alo
, max(ahi
, bhi
))]
for lo
, hi
in self
.pairs
:
if lo
== hi
: t
= repr(lo
)
else: t
= repr(lo
) + self
.rng
+ repr(hi
)
if s
: s
= s
+ (self
.sep
+ t
)
for lo
, hi
in self
.pairs
:
def fromlist(self
, list):
new
.pairs
= self
.pairs
[:]
return self
.pairs
[-1][-1]
for lo
, hi
in self
.pairs
:
if lo
<= x
<= hi
: return True
for i
in range(len(self
.pairs
)):
if x
< lo
: # Need to insert before
self
.pairs
.insert(i
, (x
, x
))
if i
> 0 and x
-1 == self
.pairs
[i
-1][1]:
if x
<= hi
: # Already in set
self
.pairs
.append((x
, x
))
def addpair(self
, xlo
, xhi
):
self
.pairs
.append((xlo
, xhi
))
def fromstring(self
, data
):
for part
in data
.split(self
.sep
):
for subp
in part
.split(self
.rng
):
new
.append((list[0], list[0]))
elif len(list) == 2 and list[0] <= list[1]:
new
.append((list[0], list[1]))
raise ValueError, 'bad data passed to IntSet'
self
.pairs
= self
.pairs
+ new
# Subroutines to read/write entries in .mh_profile and .mh_sequences
def pickline(file, key
, casefold
= 1):
pat
= re
.escape(key
) + ':'
prog
= re
.compile(pat
, casefold
and re
.IGNORECASE
)
if not line
or not line
[0].isspace():
def updateline(file, key
, value
, casefold
= 1):
pat
= re
.escape(key
) + ':(.*)\n'
prog
= re
.compile(pat
, casefold
and re
.IGNORECASE
)
newline
= '%s: %s\n' % (key
, value
)
for i
in range(len(lines
)):
os
.rename(tempfile
, file)
os
.system('rm -rf $HOME/Mail/@test')
def do(s
): print s
; print eval(s
)
do('mh.listallfolders()')
testfolders
= ['@test', '@test/test1', '@test/test2',
'@test/test1/test11', '@test/test1/test12',
'@test/test1/test11/test111']
for t
in testfolders
: do('mh.makefolder(%r)' % (t
,))
do('mh.listsubfolders(\'@test\')')
do('mh.listallsubfolders(\'@test\')')
f
= mh
.openfolder('@test')
do('f.listallsubfolders()')
seqs
['foo'] = IntSet('1-10 12-20', ' ').tolist()
for t
in reversed(testfolders
): do('mh.deletefolder(%r)' % (t
,))
context
= mh
.getcontext()
f
= mh
.openfolder(context
)
for seq
in ['first', 'last', 'cur', '.', 'prev', 'next',
'first:3', 'last:3', 'cur:3', 'cur:-3',
'1:3', '1:-3', '100:3', '100:-3', '10000:3', '10000:-3',
do('f.parsesequence(%r)' % (seq
,))
stuff
= os
.popen("pick %r 2>/dev/null" % (seq
,)).read()
list = map(int, stuff
.split())
if __name__
== '__main__':