我的测试代码发送了一封带有附件的电子邮件,并在主题和正文中保存了一个哈希值。然后,我有一个函数可以对其进行哈希搜索,获取 uid 并获取返回附件数据的电子邮件。
我遇到的问题是,当我发送消息然后搜索电子邮件服务器说没有匹配的 uid 的哈希时,但是如果我运行脚本的另一个副本,它确实会找到它!即使第二个脚本先运行!它首先找到它,但原来的没有;虽然是晚了!
输出
$ python test_server_file_functions.py
Creating mail server
S: '* OK Gimap ready for requests from [ip] [data]'
C: '0001 CAPABILITY'
S: '* CAPABILITY IMAP4rev1 UNSELECT IDLE NAMESPACE QUOTA ID XLIST CHILDREN X-GM-EXT-1 XYZZY SASL-IR AUTH=XOAUTH AUTH=XOAUTH2'
S: '0001 OK Thats all she wrote! [data]'
C: '0002 LOGIN "user@gmail.com" "password"'
S: '* CAPABILITY IMAP4rev1 UNSELECT IDLE NAMESPACE QUOTA ID XLIST CHILDREN X-GM-EXT-1 UIDPLUS COMPRESS=DEFLATE ENABLE MOVE'
S: '0002 OK user@gmail.com Anonymous Test authenticated (Success)'
C: '0003 SELECT INBOX'
S: '* FLAGS (\\Answered \\Flagged \\Draft \\Deleted \\Seen)'
S: '* OK [PERMANENTFLAGS (\\Answered \\Flagged \\Draft \\Deleted \\Seen \\*)] Flags permitted.'
S: '* OK [UIDVALIDITY 1] UIDs valid.'
S: '* 0 EXISTS'
S: '* 0 RECENT'
S: '* OK [UIDNEXT 132] Predicted next UID.'
S: '0003 OK [READ-WRITE] INBOX selected. (Success)'
Does not exists
Created mail server
Sending email
Sent email
Waiting 3 minutes to make sure it isn't a simple delay with the email being relayed
Downloading Data...
C: '0004 SEARCH SUBJECT "EMS Data ID: 622904923b1825d5742ed25fb792fafe2e710c40ceea09660a604be8fabac35ae9b006c43c7a992159b8b0df376383830a6d4c54ed5b141c8429a4feec89cd8b"'
S: '* SEARCH'
S: '0004 OK SEARCH completed (Success)'
Unhandled Error
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/twisted/mail/imap4.py", line 2455, in _defaultHandler
cmd.finish(rest, self._extraInfo)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/twisted/mail/imap4.py", line 382, in finish
d.callback((send, lastLine))
File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/twisted/internet/defer.py", line 368, in callback
self._startRunCallbacks(result)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/twisted/internet/defer.py", line 464, in _startRunCallbacks
self._runCallbacks()
--- <exception caught here> ---
File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/twisted/internet/defer.py", line 551, in _runCallbacks
current.result = callback(current.result, *args, **kw)
File "/Users/user/Documents/gms/gms/mail_backend.py", line 178, in process_download_uid
raise IOError("Hash not found, however database indicates it was uploaded")
exceptions.IOError: Hash not found, however database indicates it was uploaded
There was an error retrieving the email
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/twisted/mail/imap4.py", line 2455, in _defaultHandler
cmd.finish(rest, self._extraInfo)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/twisted/mail/imap4.py", line 382, in finish
d.callback((send, lastLine))
File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/twisted/internet/defer.py", line 368, in callback
self._startRunCallbacks(result)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/twisted/internet/defer.py", line 464, in _startRunCallbacks
self._runCallbacks()
--- <exception caught here> ---
File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/twisted/internet/defer.py", line 551, in _runCallbacks
current.result = callback(current.result, *args, **kw)
File "/Users/user/Documents/gms/gms/mail_backend.py", line 178, in process_download_uid
raise IOError("Hash not found, however database indicates it was uploaded")
exceptions.IOError: Hash not found, however database indicates it was uploaded
Quiting...
代码
import os
# logging
from twisted.python import log
import sys
import time
import email
from Utils import BLOCK
# IMAP
from IMAPBackend import connectToIMAPServer, Command
# SMTP
from SMTPBackend import connectToSMTPServer
# Hash Database
from HashDatabase import HashDatabase, hash
# deferreds
from twisted.internet.defer import Deferred, DeferredList, succeed
from twisted.internet.task import deferLater
#reactor
from twisted.internet import reactor
BLOCK_SIZE = BLOCK / 1024 # convert from bytes (needed for FTP) to kilobytes
def createMailServer(username, password, smtp_server, imap_server, hash_db = "hash.db"):
# create smtp connection
smtp_d = connectToSMTPServer(smtp_server, username, password)
# create imap connection
imap_d = connectToIMAPServer(imap_server, username, password)
dl = DeferredList([smtp_d, imap_d])
dl.addCallback(lambda r: [ MailServer(r[0][1], r[1][1], username, hash_db) ][0] )
return dl
class ServerManager(object):
def __init__(self, mail_servers):
self.mail_servers = mail_servers
def get_server(self, accnt):
for ms in self.mail_servers:
if ms.account == accnt:
return succeed(ms)
def return_server(self):
# retrieve the size avialable on the servers
get_space_deferreds = []
for ms in self.mail_servers:
d = ms.get_space()
d.addCallback(lambda r: (ms, r))
get_space_deferreds.append(d)
dl = DeferredList(get_space_deferreds, consumeErrors = True)
dl.addCallback(self.parse_sizes)
return dl
def parse_sizes(self, results):
for no_error, result in results:
server = result[0]
result = result[1]
if no_error:# not an error so a potential server
for argument in result[0]:
if argument[0] == "QUOTA":
print "Argument"
print argument
print "/Argument"
used, total = argument[2][1:3]
available_kb = int(total) - int(used)
if available_kb > BLOCK_SIZE:# server with more then our block size
return server
else:
print "Error from account %s" % server.email_address
# no free space was found :-(
raise IOError("No free space was found.")
class MailServer(object):
"Manages a server"
size = 0
used_space = 0
def __init__(self, smtp_connection, imap_connection, email_address, hash_db = "hash.db"):
self.smtp_connection = smtp_connection
self.imap_connection = imap_connection
self.hash_database = HashDatabase(hash_db)
self.email_address = email_address
self.account = email_address
# current uploads
self.current_uploads = {}
# current downloads
self.current_downloads = {}
def get_space(self):
cmd = Command("GETQUOTAROOT", "INBOX", ["QUOTAROOT", "QUOTA"])
d = self.imap_connection.sendCommand(cmd)
return d
def upload_data(self, data):
"""
Uploads data to email server returns deferred that will return with the imap uid
"""
data_hash = hash(data)
if data_hash in self.current_uploads:
d = Deferred()
self.current_uploads[data_hash].append(d)
return d
if self.hash_database.hash_in_list(data_hash):
print "Data hash is in the database; not uploading"
return succeed(data_hash)
else:
d = Deferred()
self.current_uploads[data_hash] = [d]
id = "EMS Data ID: %s" % data_hash
connection_deferred = self.smtp_connection.send_email(self.email_address, self.email_address, id, id, [["raw_ems", "ems.dat", data] ])
connection_deferred.addCallback(self.upload_success, data_hash)
connection_deferred.addErrback(self.upload_error, data_hash)
connection_deferred.addBoth(self.notify_uploaders, data_hash)
return d
def notify_uploaders(self, result, data_hash):
for waitingDeferred in self.current_uploads.pop(data_hash):
# if r is a Failure, this is equivalent to calling .errback with
# that Failure.
waitingDeferred.callback(result)
def upload_success(self, result, data_hash):
# add to hash table
self.hash_database.add_hash(data_hash)
# immediatly searching doesn't seem to work so search on data retrieval
return data_hash
def upload_error(self, error, data_hash):
# upload error
log.msg("Erroring uploading file")
log.err(error)
return error # send error to uploader
def download_data(self, data_hash):
"""
Downloads data from the email server returns a deferred that will return with the data
"""
d = Deferred()
if data_hash in self.current_downloads:
self.current_downloads[data_hash].append(d)
return d
if not self.hash_database.hash_in_list(data_hash):
print "Data Hash has never been uploaded..."
raise IOError("No such data hash exists")
else:
self.current_downloads[data_hash] = [d]
id = "EMS Data ID: %s" % data_hash
connection_deferred = self.imap_connection.search("SUBJECT", "\"EMS Data ID: %s\"" % data_hash, uid = False)
connection_deferred.addCallback(self.process_download_uid)
connection_deferred.addErrback(self.download_error, data_hash)
connection_deferred.addBoth(self.notify_downloaders, data_hash)
return d
return d
def process_download_uid(self, id):
if len(id) == 0:
raise IOError("Hash not found, however database indicates it was uploaded")
d = self.imap_connection.fetchMessage(id[-1])
d.addCallback(self.process_download_attachment, id[-1])
return d
def process_download_attachment(self, data, id):
email_text = data[id]["RFC822"]
msg = email.message_from_string(email_text)
for part in msg.walk():
type = part.get_content_type()
print repr(type)
if "raw_ems" in type:
log.msg("Found Payload")
return part.get_payload(decode = True)
log.msg("No attachment found")
raise IOError("Data not found")
def download_error(self, error, data_hash):
log.msg("Error downloading file")
log.err(error)
return error
def notify_downloaders(self, result, data_hash):
for waitingDeferred in self.current_downloads.pop(data_hash):
# if r is a Failure, this is equivalent to calling .errback with
# that Failure.
waitingDeferred.callback(result)
def delete_data(self, data_hash):
if not self.hash_database.hash_in_list(data_hash):
raise IOError("No such data hash uploaded")
else:
# delete it to prevent anyone from trying to download it while it is being deleted
self.hash_database.delete_hash(data_hash)
d = self.imap_connection.search("SUBJECT", "\"EMS Data ID: %s\"" % data_hash, uid = False)
d.addCallback(self.delete_message)
d.addErrback(self.deletion_error, data_hash)
return d
def deletion_error(self, error, data_hash):
print "Deletion Error"
log.err(error)
# restore hash to database
self.hash_database.add_hash(data_hash)
raise IOError("Couldn't delete message hash")
def delete_message(self, id):
if len(id) == 0:
raise IOError("Hash not found, however database indicates it was uploaded")
d = self.imap_connection.setFlags(id[-1], ["\\Deleted"])
d.addCallback(lambda result: self.imap_connection.expunge())
return d
## Main Code ##
if __name__ == "__main__":
def deleted_email(result):
print "Deleted the email succesfully"
print "====Result===="
print result
print "====Result===="
print "Quiting..."
os._exit(0)
def error_deleting(error):
print "There was an error deleting the email"
error.printTraceback()
print "Quiting..."
os._exit(0)
def retrieved_data(result, ms, hash):
print "Retrieved data"
print "=====Data===="
print result
print "Deleting email"
d = ms.delete_data(hash)
d.addCallback(deleted_email)
d.addErrback(error_deleting)
return d
def email_retrieval_error(error):
print "There was an error retrieving the email"
error.printTraceback()
print "Quiting..."
os._exit(0)
def sent_email(hash, ms):
print "Sent email"
print "Waiting 3 minutes to make sure it isn't a simple delay with the email being relayed"
time.sleep(3 * 60)
print "Downloading Data..."
d = ms.download_data(hash)
d.addCallback(retrieved_data, ms, hash)
d.addErrback(email_retrieval_error)
return d
def email_sending_error(error):
print "There was an error sending the email"
error.printTraceback()
print "Quiting..."
os._exit(0)
def mail_server_created(ms):
# created mail server
print "Created mail server"
print "Sending email"
d = ms.upload_data("this is the attachment data I am sending to my email account")
d.addCallback(sent_email, ms)
d.addErrback(email_sending_error)
return d
def mail_server_error(error):
print "Error creating mail server"
error.printTraceback()
print "Quiting..."
os._exit(0)
# create mail server object
print "Creating mail server"
d = createMailServer("user@gmail.com", "password", "smtp.gmail.com:587", "imap.gmail.com:993", hash_db = "testhash.db")
d.addCallback(mail_server_created)
d.addCallback(mail_server_error)
from twisted.internet import reactor
reactor.run()
我在想我可能需要重新选择邮箱?我查看了 RFC3501 选择和搜索命令,没有发现任何关于这样的问题