您现在的位置是:首页 >技术杂谈 >Flask celery异步发送邮件网站首页技术杂谈

Flask celery异步发送邮件

季布, 2024-06-17 10:14:12
简介Flask celery异步发送邮件

首先安装celery

pip install celery

安装redis
一、Redis for Windows下载
之前微软维护了一份Windows版本的Redis,但是版本停留在3.2,并且也关闭了项目更新渠道。这里我们使用另外一位大神提供的Windows Redis,更新及时,用户量也很大。

下载地址为:https://github.com/tporadowski/redis/releases

我们选择下载Redis-x64-5.0.14.msi
在这里插入图片描述

二、安装Redis:
这里以图文的形式讲解Redis的安装过程。首先双击Redis-x64-5.0.14.msi,然后点击运行。
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
三、Redis使用:
默认情况下,在安装完Redis后,就已经启动了Redis服务。如果想要手动启动Redis,则可以打开cmd,然后输入以下命令:
在这里插入图片描述
其中path/to/redis.windows.conf是存放redis.windows.conf的路径,默认是在Redis安装路径下。比如:
在这里插入图片描述
启动Redis后,可以通过redis-cli命令连接到Redis服务,然后输入命令操作了。比如:
在这里插入图片描述
至此我们就完成了Redis的安装和基本使用。

项目安装redis

pip install hiredis
pip install redis

启动命令

celery -A app.mycelery worker --loglevel=info

在这里插入图片描述
虽然启动了但是不起作用
另外在windows下运行celery需要安装一个第三方库gevent

pip install gevent
#重新启动
celery -A app.mycelery worker --loglevel=info -P gevent
# @bp.get("/email/captcha")
# def email_captcha():
#     # /email/captcha?email=xx@q.com
#     email = request.args.get('email')
#     if not email:
#         return jsonify({'code':400,'message':'请先传入邮箱!'})
#     # 产生随机六位数字
#     source = list(string.digits)
#     captcha = "".join(random.sample(source,6))
#     # subject 主题 recipients 接收者
#     message = Message(subject='【滴滴】注册验证码',recipients=[email],body='【滴滴】您的注册验证码为:%s'%captcha)
#     try:
#         mail.send(message)
#     except Exception as e:
#         print('邮件发送失败!')
#         print(e)
#         return jsonify({'code':500,'message':'邮件发送失败'})
#     return jsonify({'code': 200, 'message': '邮件发送成功'})


@bp.get("/email/captcha")
def email_captcha():
    # /email/captcha?email=xx@q.com
    email = request.args.get('email')
    if not email:
        return jsonify({'code':400,'message':'请先传入邮箱!'})
    # 产生随机六位数字
    source = list(string.digits)
    captcha = "".join(random.sample(source,6))
    subject = '【滴滴】注册验证码'
    body = '【滴滴】您的注册验证码为:%s' % captcha
    # subject 主题 recipients 接收者
    current_app.celery.send_task("send_mail",(email,subject,body))

    return jsonify({'code': 200, 'message': '邮件发送成功'})
from flask_mail import Message
from exts import mail
from celery import Celery


# 定义任务函数  发送给谁    主题     内容
def send_mail(recipient, subject, body):
    message = Message(subject=subject, recipients=[recipient], body=body)
    try:
        mail.send(message)
        return {"status": "SUCCESS"}
    except Exception:
        return {"status": "FAILURE"}


# 创建celery对象工厂函数
def make_celery(app):
    celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
                    broker=app.config['CELERY_BROKER_URL'])
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    app.celery = celery

    # 添加任务
    celery.task(name="send_mail")(send_mail)

    return celery

在这里插入图片描述

发送邮件的同时缓存验证码
在这里插入图片描述

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。