python - Task scheduling with celery beat from CLI -


i having trouble figuring out how pass in arguments function sets task programmatically.

basically, don't know how pass in either 'path' or 'jobs' parameters setup_periodic_tasks below; when launching celery beat cli.

i following guide below set script schedule tasks http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html

import json import argparse import jumpman.taskflow.task.update_security_master update_security_master celery import celery celery.schedules import crontab collections import namedtuple jumpman import logger import settings  # celery app used scheduling app = celery('schedule_app') app.config_from_object(settings.celery_config_file)  # maps process name callable generates canvas process_map = {     'update_security_master': update_security_master.task }  periodictask = namedtuple('periodictask', ['name', 'canvas', 'is_active', 'schedule', 'options'])   def get_periodic_task(config):     """resolves parameters periodic task."""     name = config.get('name')     config_path = config.get('config_path', '')     schedule = crontab(**config.get('schedule'))     is_active = bool(config.get('is_active'))     options = config.get('options', {})      # create signature     f = process_map[name]     if config_path:         params = json.load(open(config_path))     else:         params = {}     canvas = f(**params)     task = periodictask(name=name, canvas=canvas, schedule=schedule, is_active=is_active, options=options)     return task   @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs):     """schedule tasks path."""     jobs = kwargs.get('jobs', {})     path = kwargs.get('path', '')     if path , not jobs:         jobs = json.load(open(path))['jobs']      config in jobs:         task = get_periodic_task(config)         if task.is_active:             sender.add_periodic_task(                                     schedule=task.schedule,                                     sig=task.canvas,                                     name=task.name,                                     options=task.options                                     )             logger.info('task {} configured run @ cron schedule {}'.format(task.name, str(task.schedule)))         else:             logger.info('task {} inactive.'.format(task.name))     logger.info('periodic tasks scheduled.') 

the update_security_master.task function return celery canvas/signature. i'd pass in 'path' json file (or json file 'jobs') contains arguments calling update_security_master.task. add more tasks, , more entries json file scheduling tasks way.

according guide above, trying call above python file command line below:

celery -a jumpman.taskflow.taskflow beat --path 'jumpman/taskflow/task/config/master.json' 

but not working.

any suggestions alternative setup if not preferred way set tasks scheduling highly appreciated!


Comments

Popular posts from this blog

android - InAppBilling registering BroadcastReceiver in AndroidManifest -

python Tkinter Capturing keyboard events save as one single string -

sql server - Why does Linq-to-SQL add unnecessary COUNT()? -