Source code for raysect.core.workflow

# Copyright (c) 2014-2018, Dr Alex Meakins, Raysect Project
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#     1. Redistributions of source code must retain the above copyright notice,
#        this list of conditions and the following disclaimer.
#
#     2. Redistributions in binary form must reproduce the above copyright
#        notice, this list of conditions and the following disclaimer in the
#        documentation and/or other materials provided with the distribution.
#
#     3. Neither the name of the Raysect Project nor the names of its
#        contributors may be used to endorse or promote products derived from
#        this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from multiprocessing import Process, cpu_count, SimpleQueue
from raysect.core.math import random


[docs]class RenderEngine: """ Provides a common rendering workflow interface. This is a base class, its functionality must be implemented fully by the deriving class. This class provides a rendering workflow that abstracts away the underlying system performing the work. It is intended that render engines may be built that provide rendering on single cores, multi-cores (SMP) and clusters. The basic workflow is as follows. The render task is split into small, self-contained chunks of work - 'tasks'. These tasks are passed to the render engine which distributes the work to the available computing resources. These discrete computing resources are know as "workers". Workers process one task at a time and return their result to the render engine. When results are received the render engine assembles them into the final result. This workflow is implemented by supplying a set of tasks and two methods to the render engines' run() method which processes those tasks. The functions supplied to the run() method may be given additional args and kwargs. A worker calls render for each task object received. render has the following signature: :: def render(task, *render_args, **render_kwargs) where args and kwargs are additional arguments supplied by the user. Similarly, the worker calls update() for the results generated by a call to render(). Update() has the following signature: :: def update(results, *update_args, **update_kwargs) where args and kwargs are additional arguments supplied by the user. The render() function must return an object representing the results, this must be a picklable python object. """
[docs] def run(self, tasks, render, update, render_args=(), render_kwargs={}, update_args=(), update_kwargs={}): """ Starts the render engine executing the requested tasks. :param list tasks: List of user defined tuples that describe the task to execute. :param object render: Callable python object that executes the tasks. :param object update: Callable python object that is called following a render task and must be used to update the internal state of the object requesting work. :param tuple render_args: Additional arguments to pass to user defined render function. :param tuple render_kwargs: Additional keyword arguments to pass to user defined render function. :param tuple update_args: Additional arguments to pass to user defined update function. :param tuple update_kwargs: Additional keyword arguments to pass to user defined update function. """ raise NotImplementedError("Virtual method must be implemented in sub-class.")
[docs] def worker_count(self): """ Returns the number of workers in use by this engine. """ raise NotImplementedError("Virtual method must be implemented in sub-class.")
[docs]class SerialEngine(RenderEngine): """ Render engine for running on a single CPU processor. This engine is useful for debugging. >>> from raysect.core import SerialEngine >>> from raysect.optical.observer import PinholeCamera >>> >>> camera = PinholeCamera((512, 512)) >>> camera.render_engine = SerialEngine() """ def run(self, tasks, render, update, render_args=(), render_kwargs={}, update_args=(), update_kwargs={}): for task in tasks: result = render(task, *render_args, **render_kwargs) update(result, *update_args, **update_kwargs) def worker_count(self): return 1
[docs]class MulticoreEngine(RenderEngine): """ A render engine for distributing work across multiple CPU cores. The number of processes spawned by this render engine is controlled via the processes attribute. This can also be set at object initialisation. If the processes attribute is set to None (the default), the render engine will automatically set the number pf processes to be equal to the number of CPU cores detected on the machine. :param processes: The number of worker processes, or None to use all available cores (default). .. code-block:: pycon >>> from raysect.core import MulticoreEngine >>> from raysect.optical.observer import PinholeCamera >>> >>> camera = PinholeCamera((512, 512)) >>> >>> # allowing the camera to use all available CPU cores. >>> camera.render_engine = MulticoreEngine() >>> >>> # or forcing the render engine to use a specific number of CPU processes >>> camera.render_engine = MulticoreEngine(processes=8) """ def __init__(self, processes=None): super().__init__() self._processes = -1 self.processes = processes @property def processes(self): return self._processes @processes.setter def processes(self, value): if value is None: self._processes = cpu_count() else: value = int(value) if value <= 0: raise ValueError('Number of concurrent worker processes must be greater than zero.') self._processes = value def run(self, tasks, render, update, render_args=(), render_kwargs={}, update_args=(), update_kwargs={}): # establish ipc queues using a manager process task_queue = SimpleQueue() result_queue = SimpleQueue() # start process to generate image samples producer = Process(target=self._producer, args=(tasks, task_queue)) producer.start() # start worker processes workers = [] for pid in range(self._processes): p = Process(target=self._worker, args=(render, render_args, render_kwargs, task_queue, result_queue)) p.start() workers.append(p) # consume results for _ in tasks: result = result_queue.get() update(result, *update_args, **update_kwargs) # shutdown workers for _ in workers: task_queue.put(None) def worker_count(self): return self._processes def _producer(self, tasks, task_queue): for task in tasks: task_queue.put(task) def _worker(self, render, args, kwargs, task_queue, result_queue): # re-seed the random number generator to prevent all workers inheriting the same sequence random.seed() # process tasks while True: task = task_queue.get() # have we been commanded to shutdown? if task is None: break result = render(task, *args, **kwargs) result_queue.put(result)
if __name__ == '__main__': from time import time class Job: def __init__(self, engine=None): self.total = 0 self.engine = engine if engine else MulticoreEngine() def run(self, v): self.total = 0 self.engine.run(list(range(v)), self.render, self.update, render_args=(10000,)) return self.total def render(self, task, count): sum = 0 for i in range(count): sum += 1 / count return sum def update(self, result): self.total += result n = 2000 t = time() j = Job(SerialEngine()) print(j.run(n), time() - t) t = time() j = Job(MulticoreEngine()) print(j.run(n), time() - t)