我正在使用 Telepot 制作一个电报机器人作为“医疗”调查。我已经完成了机器人并且它正在工作。现在我希望这个机器人在一天中的特定时间或 24 小时后自动发送消息,以提醒用户开始调查。或者更好的是,在一天中的某个时间自动为用户启动调查。但是我一直在安排消息...
由于一些奇怪的原因,我一次收到大约 10 条消息......
我正在使用 apscheduler 包。我确实设法自动化了提醒消息,但由于 Telepot 的工作方式,用户的每个响应都被输入到函数中,导致提醒消息在调查结束后出现不止一次。我希望提醒消息每天只出现一次,以提醒用户开始调查。如果有人可以帮助我解决日程安排,我会很高兴。先感谢您。
import time
import telepot
import json
from telepot.loop import MessageLoop
from telepot.namedtuple import ReplyKeyboardMarkup # for custom keyboard
from telepot.delegate import pave_event_space, per_chat_id, create_open
from apscheduler.schedulers.background import BackgroundScheduler
## Load token
TOKEN = 'klaghklsfhglka'
# The main body
class main(telepot.helper.ChatHandler): # Implement continuous dialogue with user using DelegatorBot
def __init__(self, *args, **kwargs):
super(main, self).__init__(*args, **kwargs)
# Initialize and create empty dictionary for storing data.
self.indicator = '0'
self.data = {} # initialize an empty dictionary for storing data.
def on_chat_message(self, msg):
content_type, chat_type, chat_id = telepot.glance(msg) # this is very important.
# Debugging
print(content_type, chat_type, chat_id)
print(msg['text'])
schedule = BackgroundScheduler()
def repeat_message():
bot.sendMessage(chat_id, text='type /survey to repeat survey.')
schedule.add_job(repeat_message, 'interval', minutes=1)
schedule.start() # Initiates the scheduler to send automated message.
# Survey function
def survey():
# Symptoms
if self.indicator == 'choose_symptom':
# show custom keyboard
mark_up = ReplyKeyboardMarkup(keyboard=[['Pain'], ['Trouble with Breathing'], ['Tiredness'], ['Appetite'], ['Feeling sick'], ['Vomitting'], ['Bowel upset'], ['Hair loss']], one_time_keyboard=True)
# keyboard options are defined in the form of arrays. Text of each array element will appear on keyboard.
bot.sendMessage(chat_id, text='Was there any discomfort today?', reply_markup=mark_up) # Ask user question
self.indicator = 'rate_symptom' # move to next question
elif self.indicator == 'rate_symptom':
self.data['symptoms'] = msg['text'] # after moving to next section, this line of code stores the previous reply to the self.data dictionary.
mark_up = ReplyKeyboardMarkup(keyboard=[['Not at all'],['A little'],['Somewhat'],['Very much']], one_time_keyboard=True)
bot.sendMessage(chat_id, text='Please rate the feeling.', reply_markup=mark_up)
self.indicator = 'set_data' # move to next section
# Set data
# Store data in json file. This chunk of code should be at the end of everything.
elif self.indicator == 'set_data':
self.data['symptoms_score'] = msg['text']
self.data['date'] = time.ctime(msg['date']) # convert the time from long to normal format
with open('data.json', 'a') as handle: # stores all input into a .json file.
json.dump(self.data, handle)
handle.write("\n")
handle.close()
bot.sendMessage(chat_id, text='Thank you for feedback.')
return
# Detect start messages.
if msg['text'] == '/start': # /starts initiates bot. User gets greeted with a welcome message describing what the bot will do.
bot.sendMessage(chat_id,
text='Hello.',
parse_mode='Markdown')
elif msg['text'] == '/survey': # /survey initiates survey.
self.indicator = 'choose_symptom'
print("Survey started...")
survey() #starts survey
bot = telepot.DelegatorBot(TOKEN, [pave_event_space()(per_chat_id(), create_open, main, timeout=30),])
MessageLoop(bot).run_as_thread()
print('Listening...')
while 1:
time.sleep(10)