pythonの並列化について
pythonのプログラムを並列化しようと考え、pythonのモジュールであるmultiprocessingを実装した所以下のエラーが発生致しました。
使用しているモジュールは主に遺伝的アルゴリズム用のDEAPというモジュールです。
DEAPのリファレンス
コメントであるようにpool = multiprocessing.Pool(3)
をmainの下に置いた所プロセスは5つ生まれマルチプロセスにはなれたのですが、タスクマネージャーで見たところそのプロセスのうち1つのみ稼働している見た目だけマルチプロセスになってしまいました。
http://gazo.shitao.info/r/i/20160125185509_000.png
1つだけ25%でそれ以外0%であり、尚且つプロセスを3つ生み出す命令しているのにプロセスが4つ生み出されているため、プロセスを生み出す処理と動作する処理を別に行っていると考えられるのですがその理由がわかりません。どのようにするべきか教えてくれれば助かります。
import random
import numpy
import matplotlib.pyplot
import time
import multiprocessing
from multiprocessing import Lock
#from scoop import futures
from deap import algorithms
from deap import base
from deap import creator
from deap import tools
toolbox = base.Toolbox()
def main():
# random.seed(64)
NGEN = 5
MU = 75
LAMBDA = 75
CXPB = 0.6
MUTPB = 0.3
pop = toolbox.population(n=MU)
hof = tools.ParetoFront()
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", numpy.mean, axis=0)
stats.register("std", numpy.std, axis=0)
stats.register("min", numpy.min, axis=0)
stats.register("max", numpy.max, axis=0)
algorithms.eaMuPlusLambda(pop, toolbox, MU, LAMBDA, CXPB, MUTPB, NGEN, stats,
halloffame=hof)
return pop, stats, hof
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
toolbox.register('map', pool.map_async)
for var in range(0,5):
・・・・・
追記
並列化に関してですが、このページを参考にしております。DEAPの並列化
ここにあるようにMultiprocessing Moduleを用いて並列化しており、中身としては以下のアルゴリズム中にあるmap関数でmultiprocessing.Pool.map_async
を呼び出すことで並列化計算していると考えています。
どこを並列化したいのかという点ですが、コメントにあるようにアルゴリズム中でmap関数が使われている点が個体の評価という点ですのでそこを並列化したいと考えています。
並列化していないバージョンですが、if __name__ == '__main__':
以下の
pool = multiprocessing.Pool(processes=3)
toolbox.register('map', pool.map_async)
をコメントアウトするなどで動作させない場合プロセスが生まれず通常通りに動きました。
また、参考にしたコードを実装した場合しっかりマルチプロセスで動いておりPCの問題ではないと思いますが、自分のコードではどこが原因となって見た目マルチプロセスになっているのかわかりません。
よろしくお願いします。
追記2
自分のコードですがこのような形になります。
全コード
[参考にしたコード]同様にDEAPモジュールのalgorithmsを以下のように変更しております。
変更点
def eaMuPlusLambdaの
fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
を
jobs = toolbox.map(toolbox.evaluate, invalid_ind)
fitnesses=jobs.get()
としています。全文を以下に載せます。
algorithms
def eaMuPlusLambda(population, toolbox, mu, lambda_, cxpb, mutpb, ngen,
stats=None, halloffame=None, verbose=__debug__):
logbook = tools.Logbook()
logbook.header = ['gen', 'nevals'] + (stats.fields if stats else [])
# Evaluate the individuals with an invalid fitness
invalid_ind = [ind for ind in population if not ind.fitness.valid]
jobs= toolbox.map(toolbox.evaluate, invalid_ind)
fitnesses=jobs.get()
for ind, fit in zip(invalid_ind, fitnesses):
ind.fitness.values = fit
if halloffame is not None:
halloffame.update(population)
record = stats.compile(population) if stats is not None else {}
logbook.record(gen=0, nevals=len(invalid_ind), **record)
if verbose:
print logbook.stream
# Begin the generational process
for gen in range(1, ngen+1):
# Vary the population
offspring = varOr(population, toolbox, lambda_, cxpb, mutpb)
# Evaluate the individuals with an invalid fitness
invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
jobs = toolbox.map(toolbox.evaluate, invalid_ind)
fitnesses=jobs.get()
for ind, fit in zip(invalid_ind, fitnesses):
ind.fitness.values = fit
# Update the hall of fame with the generated individuals
if halloffame is not None:
halloffame.update(offspring)
# Select the next generation population
population[:] = toolbox.select(population + offspring, mu)
# Update the statistics with the new population
record = stats.compile(population) if stats is not None else {}
logbook.record(gen=gen, nevals=len(invalid_ind), **record)
if verbose:
print logbook.stream
return population, logbook
追記3
簡単なコード
# coding: UTF-8
'''
'''
import multiprocessing
def nibai(x):
print multiprocessing.current_process()
return x*x
def aaa():
p = Pool(3)
print p.map(nibai, range(100))
if __name__ == "__main__":
aaa()