python - Task scheduling with celery beat from CLI -
i having trouble figuring out how pass in arguments function sets task programmatically.
basically, don't know how pass in either 'path' or 'jobs' parameters setup_periodic_tasks below; when launching celery beat cli.
i following guide below set script schedule tasks http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
import json import argparse import jumpman.taskflow.task.update_security_master update_security_master celery import celery celery.schedules import crontab collections import namedtuple jumpman import logger import settings # celery app used scheduling app = celery('schedule_app') app.config_from_object(settings.celery_config_file) # maps process name callable generates canvas process_map = { 'update_security_master': update_security_master.task } periodictask = namedtuple('periodictask', ['name', 'canvas', 'is_active', 'schedule', 'options']) def get_periodic_task(config): """resolves parameters periodic task.""" name = config.get('name') config_path = config.get('config_path', '') schedule = crontab(**config.get('schedule')) is_active = bool(config.get('is_active')) options = config.get('options', {}) # create signature f = process_map[name] if config_path: params = json.load(open(config_path)) else: params = {} canvas = f(**params) task = periodictask(name=name, canvas=canvas, schedule=schedule, is_active=is_active, options=options) return task @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): """schedule tasks path.""" jobs = kwargs.get('jobs', {}) path = kwargs.get('path', '') if path , not jobs: jobs = json.load(open(path))['jobs'] config in jobs: task = get_periodic_task(config) if task.is_active: sender.add_periodic_task( schedule=task.schedule, sig=task.canvas, name=task.name, options=task.options ) logger.info('task {} configured run @ cron schedule {}'.format(task.name, str(task.schedule))) else: logger.info('task {} inactive.'.format(task.name)) logger.info('periodic tasks scheduled.')
the update_security_master.task function return celery canvas/signature. i'd pass in 'path' json file (or json file 'jobs') contains arguments calling update_security_master.task. add more tasks, , more entries json file scheduling tasks way.
according guide above, trying call above python file command line below:
celery -a jumpman.taskflow.taskflow beat --path 'jumpman/taskflow/task/config/master.json'
but not working.
any suggestions alternative setup if not preferred way set tasks scheduling highly appreciated!
Comments
Post a Comment