# -*- python -*- # # This program is an example of a more complex maildirproc # configuration. (It's actually a stripped down version of the # author's configuration.) # : # | # V # getmail # | (auto # (spam training) V training) (ham training) # incoming.spam incoming.unknown incoming.ham # \ / \ / # \ spam / \ ham / # '-----. .-----' '-----. .-----' # | | | | # V V V V # mail.spam incoming.unsorted # / / / / \ \ \ \ \ # / / / / \ \ \ \ \ # ' ' ' ' ' ' ' ' ' # ' ' ' ' ' ' ' ' ' # # incoming.ham: Mail mistakenly classified as spam are moved here # manually. Messages are unregistered as spam, registered as ham and # moved to incoming.unsorted. # # incoming.spam: Mail mistakenly classified as ham are moved here # manually. Messages are unregistered as ham, registered as spam and # moved to mail.spam. # # incoming.unknown: This is the main entrance; getmail delivers mail # here. Messages that bogofilter thinks are spam are registered as # spam and moved to mail.spam. Other messages are registered as ham # and moved to incoming.unsorted. # # incoming.unsorted: Messages are moved, copied and/or forwarded to # suitable places. Messages can be moved here manually by a mail # client if they need to be resorted without being run through the # spam classification process. import re import subprocess def autolearn_and_check_spam(mail): p = subprocess.Popen( ["bogofilter", "-u", "-v", "-I", mail.path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) (output, _) = p.communicate() processor.log("*** Bogofilter result: %r" % output.rstrip()) if p.returncode == 0: # Spam. return True elif p.returncode == 1: # Ham. return False elif p.returncode == 2: # Unsure. return False else: processor.log_error( "Error running bogofilter: Return code = %d" % p.returncode) return False def register_ham(mail): subprocess.call(["bogofilter", "-S", "-n", "-I", mail.path]) def register_spam(mail): subprocess.call(["bogofilter", "-N", "-s", "-I", mail.path]) def is_cron_mail(mail): return ( mail["from"].contains("root") and mail["subject"].matches("^(Cron|Anacron)")) # ... ###################################################################### def handle_incoming_ham(mail): processor.log("==> Learning as ham") register_ham(mail) mail.move("incoming.unsorted") def handle_incoming_spam(mail): processor.log("==> Learning as spam") register_spam(mail), mail.move(".mail.spam") def handle_incoming_unknown(mail): processor.log("... Autolearning spamicity") if autolearn_and_check_spam(mail): processor.log("==> Spam detected and learned") mail.move(".mail.spam") else: processor.log("==> Ham detected and learned") mail.move("incoming.unsorted") def handle_incoming_unsorted(mail): processor.log("... Sorting unsorted mail") # Mail to mailing lists. for (boxname, listname) in [ # ... (".list.python", "python-list@python.org"), # ... ]: if mail.from_mailing_list(listname): processor.log("... Mailing list %r" % listname) mail.move(boxname) return # Mail to specific addresses. for (boxname, address_re) in [ # ... (".mail.kofoto-list-owner", "kofoto-.*-owner@kofoto\\.rosdahl\\.net"), # ... ]: if mail.target.matches(address_re): processor.log("... Mail to %r" % address_re) mail.move(boxname) return # Mail from specific addresses. for (boxname, address_re) in [ # ... (".mail.sourceforge", "(noreply|jrosdahl)@sourceforge\\.net|" "joel\\+sourceforge@rosdahl\\.net"), # ... ]: if mail["from"].matches(address_re): processor.log("... Mail from %r" % address_re) mail.move(boxname) return # ... # Cron mail. if is_cron_mail(mail): processor.log("... Cron mail") mail.move(".mail.cron") return # Private Debian mail. if mail.target.matches("joel@.*debian\\.org|.*@packages\\.debian\\.org"): processor.log("... Private Debian mail") mail.move(".mail.debian") return # Other private mail. if mail.target.matches("joel@(rosdahl\\.net|lysator\\.liu\\.se)"): processor.log("... Other private mail") mail.move(".mail.private") return # Bcc to myself. if mail["from"].contains("joel@debian.org"): processor.log("... BCC to joel@debian.org") mail.move(".mail.debian") return # ... if mail["from"].contains("joel@rosdahl.net"): processor.log("... BCC to joel@rosdahl.net") mail.move(".mail.private") return # Unmatched mail. processor.log("... No rule matched") mail.move(".mail.junk") ###################################################################### processor.maildir_base = "~/Maildir" processor.auto_reload_rcfile = True handle_mapping = { ".incoming.ham": handle_incoming_ham, ".incoming.spam": handle_incoming_spam, ".incoming.unknown": handle_incoming_unknown, ".incoming.unsorted": handle_incoming_unsorted, } processor.maildirs = handle_mapping.keys() for mail in processor: handle_mapping[mail.maildir](mail)