python - Task scheduling with celery beat from CLI -


i having trouble figuring out how pass in arguments function sets task programmatically.

basically, don't know how pass in either 'path' or 'jobs' parameters setup_periodic_tasks below; when launching celery beat cli.

i following guide below set script schedule tasks http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html

import json import argparse import jumpman.taskflow.task.update_security_master update_security_master celery import celery celery.schedules import crontab collections import namedtuple jumpman import logger import settings  # celery app used scheduling app = celery('schedule_app') app.config_from_object(settings.celery_config_file)  # maps process name callable generates canvas process_map = {     'update_security_master': update_security_master.task }  periodictask = namedtuple('periodictask', ['name', 'canvas', 'is_active', 'schedule', 'options'])   def get_periodic_task(config):     """resolves parameters periodic task."""     name = config.get('name')     config_path = config.get('config_path', '')     schedule = crontab(**config.get('schedule'))     is_active = bool(config.get('is_active'))     options = config.get('options', {})      # create signature     f = process_map[name]     if config_path:         params = json.load(open(config_path))     else:         params = {}     canvas = f(**params)     task = periodictask(name=name, canvas=canvas, schedule=schedule, is_active=is_active, options=options)     return task   @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs):     """schedule tasks path."""     jobs = kwargs.get('jobs', {})     path = kwargs.get('path', '')     if path , not jobs:         jobs = json.load(open(path))['jobs']      config in jobs:         task = get_periodic_task(config)         if task.is_active:             sender.add_periodic_task(                                     schedule=task.schedule,                                     sig=task.canvas,                                     name=task.name,                                     options=task.options                                     )             logger.info('task {} configured run @ cron schedule {}'.format(task.name, str(task.schedule)))         else:             logger.info('task {} inactive.'.format(task.name))     logger.info('periodic tasks scheduled.') 

the update_security_master.task function return celery canvas/signature. i'd pass in 'path' json file (or json file 'jobs') contains arguments calling update_security_master.task. add more tasks, , more entries json file scheduling tasks way.

according guide above, trying call above python file command line below:

celery -a jumpman.taskflow.taskflow beat --path 'jumpman/taskflow/task/config/master.json' 

but not working.

any suggestions alternative setup if not preferred way set tasks scheduling highly appreciated!


Comments

Popular posts from this blog

python Tkinter Capturing keyboard events save as one single string -

android - InAppBilling registering BroadcastReceiver in AndroidManifest -

javascript - Z-index in d3.js -