A Note on HPO package hpbandster
Let’s understand this package by going through the provided example. For simplicity, we begin with random search optimizer rather than the BOHB employed in this example.
Preliminary
We must understand python’s threading.Condition
before diving into the implementation of hpbandster.
At first, threading.Condition
has methods acquire()
and release()
and obeys the context management protocol:
All of the objects provided by this module (i.e., threading) that have acquire() and release() methods can be used as context managers for a with statement. The acquire() method will be called when the block is entered, and release() will be called when the block is exited.
Please see ~python document for more details.
The architecture of optimizers
At first, the base class for all these optimizers is Master
class (see hpbandster/core/master.py), which utilizes a Dispatcher
object for
assigning tasks to free workers, report results back to the master and communicate to the nameserver.
Let’s see how it achieves this from the construction:
self.dispatcher = Dispatcher( self.job_callback, queue_callback=self.adjust_queue_size, run_id=run_id, ping_interval=ping_interval, nameserver=nameserver, nameserver_port=nameserver_port, host=host)
The first argument is Master
’s method job_callback()
, which takes in a Job
object (see hpbandster/core/dispatcher.py) once that job is finished, and do some “book keeping”, e.g., self.num_running_jobs -= 1
, self.iterations[job.id[0]].register_result(job)
, self.config_generator.new_result(job)
, and self.thread_cond.notify()
.
The argument queue_callback
is specified as Master
’s method adjust_queue_size()
, which
gets called with the number of workers in the pool on every update-cycle
It accordingly updates Master
’s job_queue_sizes
attribute and then notify all the threads waiting for the condition (i.e., self.thread_cond.notify_all()
).
The run()
method of Dispatcher
object is immediately called after its instantiation, which triggers two threads: one runs discover_workers()
and the other runs job_runner()
.
Going through the optimization procedure
The run()
method of Master
is the entry of the whole optimization procedure:
def run(self, n_iterations=1, min_n_workers=1, iteration_kwargs = {},):
"""
run n_iterations of SuccessiveHalving
Parameters
----------
n_iterations: int
number of iterations to be performed in this run
min_n_workers: int
minimum number of workers before starting the run
"""
self.wait_for_workers(min_n_workers)
iteration_kwargs.update({'result_logger': self.result_logger})
if self.time_ref is None:
self.time_ref = time.time()
self.config['time_ref'] = self.time_ref
self.logger.info('HBMASTER: starting run at %s'%(str(self.time_ref)))
self.thread_cond.acquire()
while True:
self._queue_wait()
next_run = None
# find a new run to schedule
for i in self.active_iterations():
next_run = self.iterations[i].get_next_run()
if not next_run is None: break
if not next_run is None:
self.logger.debug('HBMASTER: schedule new run for iteration %i'%i)
self._submit_job(*next_run)
continue
else:
if n_iterations > 0: #we might be able to start the next iteration
self.iterations.append(self.get_next_iteration(len(self.iterations), iteration_kwargs))
n_iterations -= 1
continue
# at this point there is no imediate run that can be scheduled,
# so wait for some job to finish if there are active iterations
if self.active_iterations():
self.thread_cond.wait()
else:
break
self.thread_cond.release()
for i in self.warmstart_iteration:
i.fix_timestamps(self.time_ref)
ws_data = [i.data for i in self.warmstart_iteration]
return Result([copy.deepcopy(i.data) for i in self.iterations] + ws_data, self.config)
wait_for_workers()
blocks the execution until there is enough free workers, where the self.thread_cond.wait(1)
will be notified by the self.thread_cond.notify()
in job_callback()
.
self.result_logger
is used to make live logging (more details can be found here).
self.time_ref
is set to be None
in the constructor of Master
and thus becomes the current moment here.
self.thread_cond
is an object of python’s threading.Condition
, which is used for coordinating the threads.
At the first time we enter the while
loop, the _queue_wait()
method will not block the execution, as job_queue_sizes
has been changed from (-1, 0)
to (0, 1)
by adjust_queue_size()
called by discover_workers()
, where there is one worker in this example.
self.iterations
is a list intending to hold n_iterations
iterations (each is a SuccessiveHalving
object).
At the first time we enter the while
loop, active_iterations()
cannot find any active iteration. Thus, next_run
is None
, and the SuccessiveHalving
object is returned by the get_next_iteration()
method.
By continue
, we enter the while
loop again and come to the first for
loop.
The base class of SuccessiveHalving
is BaseIteration
class (see hpbandster/core/base_iteration.py), which has attribute is_finished
(False
by its construction). active_iterations()
returns [0]
(i.e., implying that the first iteration is active), and this line next_run = self.iterations[i].get_next_run()
is executed.
By this calling, then the SuccessiveHalving
object returns a tuple consisting of:
config_id
: a tuple where the first element is the iteration index of theMaster
, the second element is the stage index of theSuccessiveHalving
object (starting from zero), and the config index among the considered configs at this stage.- the config
- the budget assigned to this config
Then the returned value is fed into this method self._submit_job(*next_run)
so that the dispatcher can submit the job to the nameserver, where num_running_jobs
is increased by one.
By continue
, we enter the while
loop again. The _queue_wait()
method will block the execution untill the submitted job has finished, that is, num_running_jobs
becomes zero by the update made by job_callback()
.
The procedure goes on in this way.