在Celery中动态添加任务可以通过以下步骤实现:
from celery import Celery
app = Celery(name)
@app.task def add(x, y): return x + y
def create_task(task_name, x, y): """ 动态创建任务 :param task_name: 任务名 :param x: :param y: :return: """ new_task = app.task(name=task_name, bind=True)(add.s(x, y)) app.register_task(new_task) return new_task
dynamic_task = create_task('dynamic_task', 1, 2) dynamic_task.delay()