Skip to content

Commit b383e56

Browse files
committed
Shut down cluster at the end of execution
1 parent 8e38853 commit b383e56

File tree

2 files changed

+104
-67
lines changed

2 files changed

+104
-67
lines changed

‎mlp-ea-decentralized/common/ga/cluster.go‎

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Cluster struct {
2121
ReceiveBestIndividual chan Individual
2222
list *memberlist.Memberlist
2323
delegate *nodeDelegate
24+
stop chan bool
2425
}
2526

2627
type eventHandler struct {
@@ -35,6 +36,7 @@ func MakeCluster(listenPort int, buffSize int, newBestIndividualChan chan Indivi
3536
BoostrapPeers: boostrapPeers,
3637
BroadcastBestIndividual: make(chan Individual, buffSize),
3738
ReceiveBestIndividual: newBestIndividualChan,
39+
stop: make(chan bool),
3840
}
3941

4042
// If a logger is provided, set it instead of the logrus standard one
@@ -48,19 +50,32 @@ func MakeCluster(listenPort int, buffSize int, newBestIndividualChan chan Indivi
4850
// PrintMembers of the p2p cluster
4951
func (c *Cluster) PrintMembers() {
5052
for {
51-
time.Sleep(5 * time.Second)
53+
select {
54+
default:
55+
time.Sleep(5 * time.Second)
5256

53-
nodes := c.list.Members()
54-
str := fmt.Sprintf("Nodes - %d:\n", len(nodes))
57+
nodes := c.list.Members()
58+
str := fmt.Sprintf("Nodes - %d:\n", len(nodes))
5559

56-
for _, node := range nodes {
57-
str += fmt.Sprintf("%s:%d - %s\n", node.Addr.String(), node.Port, node.Name)
60+
for _, node := range nodes {
61+
str += fmt.Sprintf("%s:%d - %s\n", node.Addr.String(), node.Port, node.Name)
62+
}
63+
64+
c.Logger.Info(str)
65+
66+
case <-c.stop:
67+
return
5868
}
5969

60-
c.Logger.Info(str)
6170
}
6271
}
6372

73+
// Shutdown the cluster node
74+
func (c *Cluster) Shutdown() {
75+
c.Logger.Info("Shutting down cluster")
76+
close(c.stop)
77+
}
78+
6479
// Start creates a new node with metadata for this process and joins a existing cluster
6580
// by connecting to peers in cluster.boostrapPeers array
6681
func (c *Cluster) Start(metadata NodeMetadata) {
@@ -100,10 +115,16 @@ func (c *Cluster) Start(metadata NodeMetadata) {
100115
//go c.updateMetadataHandler()
101116
go c.PrintMembers()
102117

103-
// TODO: Add stop channel and select
104118
for {
105-
indiv := <-c.BroadcastBestIndividual
106-
c.delegate.Broadcast(indiv)
119+
select {
120+
default:
121+
indiv := <-c.BroadcastBestIndividual
122+
c.delegate.Broadcast(indiv)
123+
124+
case <-c.stop:
125+
return
126+
}
127+
107128
}
108129
}
109130

‎mlp-ea-decentralized/common/ga/poolModel.go‎

Lines changed: 74 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ func (pool *PoolModel) Minimize() {
128128

129129
// Launch cluster membership service
130130
go pool.cluster.Start(pool.getCurrentNodeMetadata())
131+
defer pool.cluster.Shutdown()
131132

132133
// Append all individuals in the population to the evaluation channel
133134
pool.population.Fold(func(indiv eaopt.Individual) bool {
@@ -390,6 +391,7 @@ func (pool *PoolModel) handleBroadcastedIndividual() {
390391
} else {
391392
Log.Warn("Got broadcasted solution worse than currest best one. Ignoring...")
392393
}
394+
393395
case <-pool.stop:
394396
return
395397
}
@@ -447,31 +449,38 @@ func (pool *PoolModel) populationGrowControl() {
447449
Log.Infoln("populationGrowControl goroutine started")
448450

449451
for {
450-
time.Sleep(1 * time.Second)
452+
select {
453+
default:
454+
time.Sleep(1 * time.Second)
451455

452-
snap := pool.population.Snapshot()
453-
indivArr := make([]eaopt.Individual, len(snap))
454-
var bestCopy eaopt.Individual
455-
i := 0
456+
snap := pool.population.Snapshot()
457+
indivArr := make([]eaopt.Individual, len(snap))
458+
var bestCopy eaopt.Individual
459+
i := 0
456460

457-
for _, item := range snap {
458-
indivArr[i] = item.Object.(eaopt.Individual)
459-
i++
460-
}
461+
for _, item := range snap {
462+
indivArr[i] = item.Object.(eaopt.Individual)
463+
i++
464+
}
461465

462-
indivArr = pool.SortFunc(indivArr, pool.SortPrecission)
466+
indivArr = pool.SortFunc(indivArr, pool.SortPrecission)
463467

464-
for i := pool.PopSize; i < len(indivArr); i++ {
465-
pool.population.Remove(indivArr[i])
466-
}
468+
for i := pool.PopSize; i < len(indivArr); i++ {
469+
pool.population.Remove(indivArr[i])
470+
}
467471

468-
// Replace the remaining worst one by the best one so far
469-
if len(indivArr) >= pool.PopSize {
470-
pool.population.Remove(indivArr[pool.PopSize-1])
472+
// Replace the remaining worst one by the best one so far
473+
if len(indivArr) >= pool.PopSize {
474+
pool.population.Remove(indivArr[pool.PopSize-1])
471475

472-
bestCopy = pool.BestSolution.Clone(pool.Rnd)
473-
pool.population.Add(bestCopy)
476+
bestCopy = pool.BestSolution.Clone(pool.Rnd)
477+
pool.population.Add(bestCopy)
478+
}
479+
480+
case <-pool.stop:
481+
return
474482
}
483+
475484
}
476485
}
477486

@@ -483,61 +492,68 @@ func (pool *PoolModel) migrationScheduler() {
483492
Log.Infoln("migrationScheduler goroutine started")
484493

485494
for {
486-
time.Sleep(5 * time.Second)
487-
var conn *grpc.ClientConn
488-
dialed := false
495+
select {
496+
default:
497+
time.Sleep(5 * time.Second)
498+
var conn *grpc.ClientConn
499+
dialed := false
489500

490-
snap := pool.population.Snapshot()
491-
indivArr := make([]eaopt.Individual, 0, len(snap))
492-
migrate := make([]Individual, pool.NMigrate)
501+
snap := pool.population.Snapshot()
502+
indivArr := make([]eaopt.Individual, 0, len(snap))
503+
migrate := make([]Individual, pool.NMigrate)
493504

494-
for _, item := range snap {
495-
indivArr = append(indivArr, item.Object.(eaopt.Individual).Clone(pool.Rnd))
496-
}
505+
for _, item := range snap {
506+
indivArr = append(indivArr, item.Object.(eaopt.Individual).Clone(pool.Rnd))
507+
}
497508

498-
// Sort population and get NMigrate best
499-
indivArr = pool.SortFunc(indivArr, pool.SortPrecission)
500-
for i := range migrate {
501-
migrate[i] = Individual{
502-
IndividualID: indivArr[i].ID,
503-
Fitness: indivArr[i].Fitness,
504-
Evaluated: indivArr[i].Evaluated,
505-
Genome: pool.Delegate.SerializeGenome(indivArr[i].Genome),
509+
// Sort population and get NMigrate best
510+
indivArr = pool.SortFunc(indivArr, pool.SortPrecission)
511+
for i := range migrate {
512+
migrate[i] = Individual{
513+
IndividualID: indivArr[i].ID,
514+
Fitness: indivArr[i].Fitness,
515+
Evaluated: indivArr[i].Evaluated,
516+
Genome: pool.Delegate.SerializeGenome(indivArr[i].Genome),
517+
}
506518
}
507-
}
508519

509-
// Pick a random node from the cluster and dial it
510-
members := pool.cluster.GetMembers()
511-
for !dialed {
512-
remotePeer := members[pool.Rnd.Intn(len(members))]
513-
remoteMetadata := NodeMetadata{}
520+
// Pick a random node from the cluster and dial it
521+
members := pool.cluster.GetMembers()
522+
for !dialed {
523+
remotePeer := members[pool.Rnd.Intn(len(members))]
524+
remoteMetadata := NodeMetadata{}
514525

515-
err := remoteMetadata.Unmarshal(remotePeer.Meta)
516-
if err != nil {
517-
Log.Errorf("Could not parse Node Metadata on migrationScheduler. %s", err.Error())
526+
err := remoteMetadata.Unmarshal(remotePeer.Meta)
527+
if err != nil {
528+
Log.Errorf("Could not parse Node Metadata on migrationScheduler. %s", err.Error())
529+
530+
}
531+
532+
remoteAddr := fmt.Sprintf("%s:%d",
533+
remotePeer.Addr.String(), remoteMetadata.GrpcPort)
518534

535+
conn, err = grpc.Dial(remoteAddr, grpc.WithInsecure())
536+
if err != nil {
537+
Log.Warnf("Could not dial %s. %s", remoteAddr, err.Error())
538+
}
539+
540+
dialed = true
519541
}
520542

521-
remoteAddr := fmt.Sprintf("%s:%d",
522-
remotePeer.Addr.String(), remoteMetadata.GrpcPort)
543+
client := NewDistributedEAClient(conn)
544+
_, err := client.MigrateIndividuals(context.Background(), &IndividualsBatch{
545+
Individuals: migrate,
546+
})
523547

524-
conn, err = grpc.Dial(remoteAddr, grpc.WithInsecure())
525548
if err != nil {
526-
Log.Warnf("Could not dial %s. %s", remoteAddr, err.Error())
549+
Log.Errorf("could not MigrateIndividuals to %s. %s", conn.Target(), err.Error())
527550
}
528551

529-
dialed = true
530-
}
552+
conn.Close()
531553

532-
client := NewDistributedEAClient(conn)
533-
_, err := client.MigrateIndividuals(context.Background(), &IndividualsBatch{
534-
Individuals: migrate,
535-
})
536-
537-
if err != nil {
538-
Log.Errorf("could not MigrateIndividuals to %s. %s", conn.Target(), err.Error())
554+
case <-pool.stop:
555+
return
539556
}
540557

541-
conn.Close()
542558
}
543559
}

0 commit comments

Comments
 (0)