Skip to content
Snippets Groups Projects
Commit 1b0bfd10 authored by Vo Minh Thu's avatar Vo Minh Thu
Browse files

[IMP] cron: alternative implementation:

- The previous implementation was optimized to make few queries to the database
  but needed to keep some internal state.
- That state was updated whenever the ir_cron table was modified by the ORM
  (this works only when the cron and web processes/threads are inside a single
  OpenERP server instance).
- The new implementation is instead polling the database.
- This is deemed acceptable in `normal` situation (i.e. not a SaaS with
  thousand of databases).
- This makes it possible to avoid sharing state or the use of IPC.
- This makes it possible to add/remove additional worker processes,
  possibly on different machines.
- The code of the older implementation is removed in this commit but
  will be added back in a later commit: this is the 6.1 stable branch
  and we don't want to change the existing installation, but simply
  provide a solution for those running OpenERP with Gunicorn (which
  uses processes for which no cron state were shared).

bzr revid: vmt@openerp.com-20120328090320-vshsfv3gt1ck34s1
parent 91ff283a
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
OpenERP cron jobs worker
This script executes OpenERP cron jobs. Normally, cron jobs are handled by the
OpenERP server but depending on deployment needs, independent worker processes
can be used.
"""
import openerp
if __name__ == '__main__':
openerp.modules.module.initialize_sys_path()
openerp.modules.loading.open_openerp_namespace()
openerp.tools.config['log_handler'] = ['openerp.addons.base.ir.ir_cron:DEBUG']
openerp.netsvc.init_logger()
import openerp.addons.base
openerp.addons.base.ir.ir_cron.ir_cron._run(['xx'])
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
...@@ -93,9 +93,6 @@ def preload_registry(dbname): ...@@ -93,9 +93,6 @@ def preload_registry(dbname):
""" Preload a registry, and start the cron.""" """ Preload a registry, and start the cron."""
try: try:
db, registry = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False) db, registry = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False)
# jobs will start to be processed later, when openerp.cron.start_master_thread() is called by openerp.service.start_services()
registry.schedule_cron_jobs()
except Exception: except Exception:
_logger.exception('Failed to initialize database `%s`.', dbname) _logger.exception('Failed to initialize database `%s`.', dbname)
......
...@@ -31,7 +31,6 @@ import netsvc ...@@ -31,7 +31,6 @@ import netsvc
import openerp import openerp
import pooler import pooler
import tools import tools
from openerp.cron import WAKE_UP_NOW
from osv import fields, osv from osv import fields, osv
from tools import DEFAULT_SERVER_DATETIME_FORMAT from tools import DEFAULT_SERVER_DATETIME_FORMAT
from tools.safe_eval import safe_eval as eval from tools.safe_eval import safe_eval as eval
...@@ -142,17 +141,15 @@ class ir_cron(osv.osv): ...@@ -142,17 +141,15 @@ class ir_cron(osv.osv):
except Exception, e: except Exception, e:
self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e) self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e)
def _run_job(self, cr, job, now): def _run_job(self, cr, job):
""" Run a given job taking care of the repetition. """ Run a given job taking care of the repetition.
The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this The cursor has a lock on the job (aquired by _acquire_job()).
method is run in a worker thread (spawned by _run_jobs_multithread())).
:param job: job to be run (as a dictionary). :param job: job to be run (as a dictionary).
:param now: timestamp (result of datetime.now(), no need to call it multiple time).
""" """
try: try:
now = datetime.utcnow()
nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT) nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
numbercall = job['numbercall'] numbercall = job['numbercall']
...@@ -171,45 +168,29 @@ class ir_cron(osv.osv): ...@@ -171,45 +168,29 @@ class ir_cron(osv.osv):
cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s", cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
(nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id'])) (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
if numbercall:
# Reschedule our own main cron thread if necessary.
# This is really needed if this job runs longer than its rescheduling period.
nextcall = calendar.timegm(nextcall.timetuple())
openerp.cron.schedule_wakeup(nextcall, cr.dbname)
finally: finally:
cr.commit() cr.commit()
cr.close() cr.close()
openerp.cron.release_thread_slot()
def _run_jobs_multithread(self): @classmethod
def _acquire_job(cls, db_name):
# TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
""" Process the cron jobs by spawning worker threads. """ Try to process one cron job.
This selects in database all the jobs that should be processed. It then This selects in database all the jobs that should be processed. It then
tries to lock each of them and, if it succeeds, spawns a thread to run tries to lock each of them and, if it succeeds, run
the cron job (if it doesn't succeed, it means the job was already the cron job (if it doesn't succeed, it means the job was already
locked to be taken care of by another thread). locked to be taken care of by another thread) and stop.
The cursor used to lock the job in database is given to the worker
thread (which has to close it itself).
""" """
db = self.pool.db db = openerp.sql_db.db_connect(db_name)
cr = db.cursor() cr = db.cursor()
db_name = db.dbname
try: try:
jobs = {} # mapping job ids to jobs for all jobs being processed.
now = datetime.now()
# Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1. # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
cr.execute("""SELECT * FROM ir_cron cr.execute("""SELECT * FROM ir_cron
WHERE numbercall != 0 WHERE numbercall != 0
AND active AND nextcall <= (now() at time zone 'UTC') AND active AND nextcall <= (now() at time zone 'UTC')
ORDER BY priority""") ORDER BY priority""")
for job in cr.dictfetchall(): for job in cr.dictfetchall():
if not openerp.cron.get_thread_slots():
break
jobs[job['id']] = job
task_cr = db.cursor() task_cr = db.cursor()
try: try:
# Try to grab an exclusive lock on the job row from within the task transaction # Try to grab an exclusive lock on the job row from within the task transaction
...@@ -233,31 +214,11 @@ class ir_cron(osv.osv): ...@@ -233,31 +214,11 @@ class ir_cron(osv.osv):
# we're exiting due to an exception while acquiring the lot # we're exiting due to an exception while acquiring the lot
task_cr.close() task_cr.close()
# Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock # Got the lock on the job row, run its code
task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now)) _logger.debug('Starting job `%s`.', job['name'])
# force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default) registry = openerp.pooler.get_pool(db_name)
task_thread.setDaemon(False) registry[cls._name]._run_job(task_cr, job)
openerp.cron.take_thread_slot() return True
task_thread.start()
_logger.debug('Cron execution thread for job `%s` spawned', job['name'])
# Find next earliest job ignoring currently processed jobs (by this and other cron threads)
find_next_time_query = """SELECT min(nextcall) AS min_next_call
FROM ir_cron WHERE numbercall != 0 AND active"""
if jobs:
cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),))
else:
cr.execute(find_next_time_query)
next_call = cr.dictfetchone()['min_next_call']
if next_call:
next_call = calendar.timegm(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT))
else:
# no matching cron job found in database, re-schedule arbitrarily in 1 day,
# this delay will likely be modified when running jobs complete their tasks
next_call = time.time() + (24*3600)
openerp.cron.schedule_wakeup(next_call, db_name)
except Exception, ex: except Exception, ex:
_logger.warning('Exception in cron:', exc_info=True) _logger.warning('Exception in cron:', exc_info=True)
...@@ -266,18 +227,21 @@ class ir_cron(osv.osv): ...@@ -266,18 +227,21 @@ class ir_cron(osv.osv):
cr.commit() cr.commit()
cr.close() cr.close()
def update_running_cron(self, cr): return False
""" Schedule as soon as possible a wake-up for this database. """
# Verify whether the server is already started and thus whether we need to commit @classmethod
# immediately our changes and restart the cron agent in order to apply the change def _run(cls, db_names):
# immediately. The commit() is needed because as soon as the cron is (re)started it while True:
# will query the database with its own cursor, possibly before the end of the t1 = time.time()
# current transaction. for db_name in db_names:
# This commit() is not an issue in most cases, but we must absolutely avoid it while(cls._acquire_job(db_name)):
# when the server is only starting or loading modules (hence the test on pool._init). pass
if not self.pool._init: t2 = time.time()
cr.commit() t = t2 - t1
openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname) if t > 60:
_logger.warning('Cron worker: processing all jobs took more than 1 minute to complete (%ss.).', int(t))
else:
time.sleep(60 - t)
def _try_lock(self, cr, uid, ids, context=None): def _try_lock(self, cr, uid, ids, context=None):
"""Try to grab a dummy exclusive write-lock to the rows with the given ids, """Try to grab a dummy exclusive write-lock to the rows with the given ids,
...@@ -294,20 +258,16 @@ class ir_cron(osv.osv): ...@@ -294,20 +258,16 @@ class ir_cron(osv.osv):
def create(self, cr, uid, vals, context=None): def create(self, cr, uid, vals, context=None):
res = super(ir_cron, self).create(cr, uid, vals, context=context) res = super(ir_cron, self).create(cr, uid, vals, context=context)
self.update_running_cron(cr)
return res return res
def write(self, cr, uid, ids, vals, context=None): def write(self, cr, uid, ids, vals, context=None):
self._try_lock(cr, uid, ids, context) self._try_lock(cr, uid, ids, context)
res = super(ir_cron, self).write(cr, uid, ids, vals, context=context) res = super(ir_cron, self).write(cr, uid, ids, vals, context=context)
self.update_running_cron(cr)
return res return res
def unlink(self, cr, uid, ids, context=None): def unlink(self, cr, uid, ids, context=None):
self._try_lock(cr, uid, ids, context) self._try_lock(cr, uid, ids, context)
res = super(ir_cron, self).unlink(cr, uid, ids, context=context) res = super(ir_cron, self).unlink(cr, uid, ids, context=context)
self.update_running_cron(cr)
return res return res
ir_cron()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
...@@ -49,167 +49,9 @@ import tools ...@@ -49,167 +49,9 @@ import tools
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
# Heapq of database wake-ups. Note that 'database wake-up' meaning is in
# the context of the cron management. This is not originally about loading
# a database, although having the database name in the queue will
# cause it to be loaded when the schedule time is reached, even if it was
# unloaded in the mean time. Normally a database's wake-up is cancelled by
# the RegistryManager when the database is unloaded - so this should not
# cause it to be reloaded.
#
# TODO: perhaps in the future we could consider a flag on ir.cron jobs # TODO: perhaps in the future we could consider a flag on ir.cron jobs
# that would cause database wake-up even if the database has not been # that would cause database wake-up even if the database has not been
# loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something) # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
#
# Each element is a triple (timestamp, database-name, boolean). The boolean
# specifies if the wake-up is canceled (so a wake-up can be canceled without
# relying on the heapq implementation detail; no need to remove the job from
# the heapq).
_wakeups = []
# Mapping of database names to the wake-up defined in the heapq,
# so that we can cancel the wake-up without messing with the heapq
# invariant: lookup the wake-up by database-name, then set
# its third element to True.
_wakeup_by_db = {}
# Re-entrant lock to protect the above _wakeups and _wakeup_by_db variables.
# We could use a simple (non-reentrant) lock if the runner function below
# was more fine-grained, but we are fine with the loop owning the lock
# while spawning a few threads.
_wakeups_lock = threading.RLock()
# Maximum number of threads allowed to process cron jobs concurrently. This
# variable is set by start_master_thread using openerp.conf.max_cron_threads.
_thread_slots = None
# A (non re-entrant) lock to protect the above _thread_slots variable.
_thread_slots_lock = threading.Lock()
# Sleep duration limits - must not loop too quickly, but can't sleep too long
# either, because a new job might be inserted in ir_cron with a much sooner
# execution date than current known ones. We won't see it until we wake!
MAX_SLEEP = 60 # 1 min
MIN_SLEEP = 1 # 1 sec
# Dummy wake-up timestamp that can be used to force a database wake-up asap
WAKE_UP_NOW = 1
def get_thread_slots():
""" Return the number of available thread slots """
return _thread_slots
def release_thread_slot():
""" Increment the number of available thread slots """
global _thread_slots
with _thread_slots_lock:
_thread_slots += 1
def take_thread_slot():
""" Decrement the number of available thread slots """
global _thread_slots
with _thread_slots_lock:
_thread_slots -= 1
def cancel(db_name):
""" Cancel the next wake-up of a given database, if any.
:param db_name: database name for which the wake-up is canceled.
"""
_logger.debug("Cancel next wake-up for database '%s'.", db_name)
with _wakeups_lock:
if db_name in _wakeup_by_db:
_wakeup_by_db[db_name][2] = True
def cancel_all():
""" Cancel all database wake-ups. """
_logger.debug("Cancel all database wake-ups")
global _wakeups
global _wakeup_by_db
with _wakeups_lock:
_wakeups = []
_wakeup_by_db = {}
def schedule_wakeup(timestamp, db_name):
""" Schedule a new wake-up for a database.
If an earlier wake-up is already defined, the new wake-up is discarded.
If another wake-up is defined, that wake-up is discarded and the new one
is scheduled.
:param db_name: database name for which a new wake-up is scheduled.
:param timestamp: when the wake-up is scheduled.
"""
if not timestamp:
return
with _wakeups_lock:
if db_name in _wakeup_by_db:
task = _wakeup_by_db[db_name]
if not task[2] and timestamp > task[0]:
# existing wakeup is valid and occurs earlier than new one
return
task[2] = True # cancel existing task
task = [timestamp, db_name, False]
heapq.heappush(_wakeups, task)
_wakeup_by_db[db_name] = task
_logger.debug("Wake-up scheduled for database '%s' @ %s", db_name,
'NOW' if timestamp == WAKE_UP_NOW else timestamp)
def runner():
"""Neverending function (intended to be run in a dedicated thread) that
checks every 60 seconds the next database wake-up. TODO: make configurable
"""
while True:
runner_body()
def runner_body():
with _wakeups_lock:
while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots():
task = heapq.heappop(_wakeups)
timestamp, db_name, canceled = task
if canceled:
continue
del _wakeup_by_db[db_name]
registry = openerp.pooler.get_pool(db_name)
if not registry._init:
_logger.debug("Database '%s' wake-up! Firing multi-threaded cron job processing", db_name)
registry['ir.cron']._run_jobs_multithread()
amount = MAX_SLEEP
with _wakeups_lock:
# Sleep less than MAX_SLEEP if the next known wake-up will happen before that.
if _wakeups and get_thread_slots():
amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time()))
_logger.debug("Going to sleep for %ss", amount)
time.sleep(amount)
def start_master_thread():
""" Start the above runner function in a daemon thread.
The thread is a typical daemon thread: it will never quit and must be
terminated when the main process exits - with no consequence (the processing
threads it spawns are not marked daemon).
"""
global _thread_slots
_thread_slots = openerp.conf.max_cron_threads
db_maxconn = tools.config['db_maxconn']
if _thread_slots >= tools.config.get('db_maxconn', 64):
_logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
"this may cause trouble if you reach that number of parallel cron tasks.",
db_maxconn, _thread_slots)
if _thread_slots:
t = threading.Thread(target=runner, name="openerp.cron.master_thread")
t.setDaemon(True)
t.start()
_logger.debug("Master cron daemon started!")
else:
_logger.info("No master cron daemon (0 workers needed).")
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
...@@ -27,7 +27,6 @@ import threading ...@@ -27,7 +27,6 @@ import threading
import openerp.sql_db import openerp.sql_db
import openerp.osv.orm import openerp.osv.orm
import openerp.cron
import openerp.tools import openerp.tools
import openerp.modules.db import openerp.modules.db
import openerp.tools.config import openerp.tools.config
...@@ -110,15 +109,6 @@ class Registry(object): ...@@ -110,15 +109,6 @@ class Registry(object):
return res return res
def schedule_cron_jobs(self):
""" Make the cron thread care about this registry/database jobs.
This will initiate the cron thread to check for any pending jobs for
this registry/database as soon as possible. Then it will continuously
monitor the ir.cron model for future jobs. See openerp.cron for
details.
"""
openerp.cron.schedule_wakeup(openerp.cron.WAKE_UP_NOW, self.db.dbname)
def clear_caches(self): def clear_caches(self):
""" Clear the caches """ Clear the caches
This clears the caches associated to methods decorated with This clears the caches associated to methods decorated with
...@@ -213,9 +203,6 @@ class RegistryManager(object): ...@@ -213,9 +203,6 @@ class RegistryManager(object):
finally: finally:
cr.close() cr.close()
if pooljobs:
registry.schedule_cron_jobs()
return registry return registry
@classmethod @classmethod
...@@ -233,8 +220,6 @@ class RegistryManager(object): ...@@ -233,8 +220,6 @@ class RegistryManager(object):
if db_name in cls.registries: if db_name in cls.registries:
cls.registries[db_name].clear_caches() cls.registries[db_name].clear_caches()
del cls.registries[db_name] del cls.registries[db_name]
openerp.cron.cancel(db_name)
@classmethod @classmethod
def delete_all(cls): def delete_all(cls):
......
...@@ -28,7 +28,6 @@ import netrpc_server ...@@ -28,7 +28,6 @@ import netrpc_server
import web_services import web_services
import websrv_lib import websrv_lib
import openerp.cron
import openerp.modules import openerp.modules
import openerp.netsvc import openerp.netsvc
import openerp.osv import openerp.osv
...@@ -51,7 +50,7 @@ _logger = logging.getLogger(__name__) ...@@ -51,7 +50,7 @@ _logger = logging.getLogger(__name__)
def start_services(): def start_services():
""" Start all services. """ Start all services.
Services include the different servers and cron threads. Services include the different servers.
""" """
# Instantiate local services (this is a legacy design). # Instantiate local services (this is a legacy design).
...@@ -64,9 +63,6 @@ def start_services(): ...@@ -64,9 +63,6 @@ def start_services():
#http_server.init_static_http() #http_server.init_static_http()
netrpc_server.init_servers() netrpc_server.init_servers()
# Start the main cron thread.
openerp.cron.start_master_thread()
# Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC). # Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC).
openerp.netsvc.Server.startAll() openerp.netsvc.Server.startAll()
...@@ -76,9 +72,6 @@ def start_services(): ...@@ -76,9 +72,6 @@ def start_services():
def stop_services(): def stop_services():
""" Stop all services. """ """ Stop all services. """
# stop scheduling new jobs; we will have to wait for the jobs to complete below
openerp.cron.cancel_all()
openerp.netsvc.Server.quitAll() openerp.netsvc.Server.quitAll()
openerp.wsgi.core.stop_server() openerp.wsgi.core.stop_server()
config = openerp.tools.config config = openerp.tools.config
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment