SMTP TLS Reporting really works
Get advice when the your MTA cannot be connected
[Intro] [Python script]

Intro

To enable TLSRPT, define a DNS record at _smtp._tls.yourdomain.name. Refer to RFC8460 for the policy syntax. Basically, you decide whether you want to receive reports via mail or via web. Why would you want to receive them? And why would you want to use this script, or a similar one?

After enabling it, some mail domains are going to send you daily reports saying that everything is well. However, if you block access to spammers at the connection level, using tools such as fail2ban or ipqbdb, you'll happen to block some too-big-to-block (TBTB) senders. TLSRPT allows those senders to notify you that a TLS connection attempt failed, providing the IP address(es) that you may want to redeem.

The present script reads the report attached to the input message, looking for failures. If it finds any, it adds a special header field, TLS-Report-Has-Failures that you can check in your mail filter in order to redirect the message to some additional recipients/ folders where it can catch your attention. It also extracts noteworthy information, so that you don't have to open the compressed attachment.

Of course, redeeming TBTB addresses is not the only use TLSRPT provides. For example, you can derive statistics to be compared with DMARC reports.

Python script

001: #!/usr/bin/python3
002: """ read tls reports, see rfc 8460"
003: 
004: This is designed to be called via Maildrop's xfilter command.  It adds
005: STS summary to the message text, and adds a TLS-Report-Has-Failures
006: header field in case the report contains failure-details.
007: 
008: using python3
009: 
010: """
011: 
012: import email
013: import zlib
014: import json
015: import sys
016: 
017: class UnexpectedContent(RuntimeError):
018:    pass
019: 
020: class CheckResult:
021:    pass
022: 
023: def check_json(data):
024:    rep = CheckResult()
025:    rep.text = ''
026:    rep.failures = 0
027:    rep.internalerr = ''
028: 
029:    try:
030:       obj = json.loads(data, strict=False)
031:       for pol in obj['policies']:
032:          if pol['policy']['policy-type'] == 'sts':
033:             rep.text += str(pol['summary']) + "\n"
034:          if 'failure-details' in pol:
035:             rep.text += pol['policy']['policy-type'] + " failure\n"
036:             rep.failures += pol['summary']['total-failure-session-count']
037:             for det in pol['failure-details']:
038:                rep.text += "sending-mta-ip: " + det['sending-mta-ip'] + "\n"
039:    except Exception as e:
040:       rep.text += '\n\nBad json data:\n' + str(e) + "\n"
041:       rep.failures += 1
042:       rep.internalerr = 'Bad json data:' + str(e)
043:    return rep
044: 
045: def bad_attachment(txt):
046:    rep = CheckResult()
047:    rep.text = txt
048:    rep.failures = 1
049:    rep.internalerr = txt
050:    return rep
051: 
052: def check_normal(msg):
053:    """ Check that msg is plain text with attachment, no failures.
054:    Add summary to the msg text.
055:    If not normal, add TLS-Report-Has-Failures header field.
056:    """
057:    error = ''
058:    rep = None
059:    if msg.is_multipart():
060:       parts = msg.get_payload()
061:       if len(parts) == 2:
062:          stream = None
063:          if parts[0].get_content_maintype() == 'text':
064:             part = parts[1]
065:             p = part.get_content_type()
066:             if p in ('application/gzip', 'application/tlsrpt+gzip'):
067:                # https://stackoverflow.com/questions/3122145/zlib-error-error-3-while-decompressing-incorrect-header-check#22310760
068:                try:
069:                   stream = zlib.decompress(part.get_payload(decode=True), zlib.MAX_WBITS|16)
070:                except Exception as e:
071:                   rep = bad_attachment(str(e))
072:             elif p == 'application/octet-stream':
073:                try:
074:                   stream = zlib.decompress(part.get_payload(decode=True), zlib.MAX_WBITS|32)
075:                except Exception as e:
076:                   rep = bad_attachment(str(e))
077:             elif p == 'application/json':
078:                stream = part.get_payload(decode=True)
079:             else:
080:                rep = bad_attachment('bad attachment type: ' + p)
081:             if rep is None:
082:                rep = check_json(stream)
083:          else:
084:             error = 'bad text type: ' + parts[0].get_content_type()
085:       else:
086:          error = 'bad attachment count: ' + str(len(parts) - 1)
087:    else:
088:       error = 'missing attachment'
089: 
090:    if error == '':
091:       part = parts[0]
092:       text = part.get_payload(decode=True).decode('utf-8', 'ignore') + "\n"
093:       text += rep.text + "\n"
094:       part.set_payload(text, 'utf-8')
095:       if rep.failures > 0:
096:          msg.add_header('TLS-Report-Has-Failures', str(rep.failures))
097:       if rep.internalerr != '':
098:          sys.stderr.write('TLSrpt interr: ' + rep.internalerr + "\n")
099:    else:
100:       msg.add_header('TLS-Report-Has-Failures', error)
101:       sys.stderr.write('TLSrpt error: ' + error + "\n")
102: 
103: 
104: if __name__ == '__main__':
105:    try:
106:       msg = email.message_from_file(sys.stdin)
107:       check_normal(msg)
108:    except Exception as e:
109:       import traceback
110:       traceback.print_exc(4, file=sys.stderr)
111:       sys.stderr.write('TLSrpt exception: ' + str(e) + "\n")
112:    finally:
113:       sys.stdout.write(msg.as_string(unixfrom=False) + "\n")
114:       sys.exit(0)
115: 
zero rights

Copy and paste the script to your editor of choice. Note that it uses \n line endings. Add \r if needed. Also, the script writes to stderr. You may want to replace that with syslog if your filter doesn't do that for you. You may also want to tweak email.policy. When you're done, save the file where your mail filter can reach it.