001/*******************************************************************************
002 * Copyright (c) 2016 Pablo Pavon Mariņo.
003 * All rights reserved. This program and the accompanying materials
004 * are made available under the terms of the GNU Lesser Public License v2.1
005 * which accompanies this distribution, and is available at
006 * http://www.gnu.org/licenses/lgpl.html
007 ******************************************************************************/
008package com.net2plan.examples.ocnbook.onlineSim;
009
010
011
012
013import java.io.File;
014import java.io.FileOutputStream;
015import java.io.PrintStream;
016import java.util.List;
017import java.util.Map;
018import java.util.Random;
019import java.util.Set;
020
021import cern.colt.matrix.tdouble.DoubleFactory1D;
022import cern.colt.matrix.tdouble.DoubleFactory2D;
023import cern.colt.matrix.tdouble.DoubleMatrix1D;
024import cern.colt.matrix.tdouble.DoubleMatrix2D;
025import cern.jet.math.tdouble.DoubleFunctions;
026
027import com.jom.OptimizationProblem;
028import com.net2plan.interfaces.networkDesign.Demand;
029import com.net2plan.interfaces.networkDesign.Link;
030import com.net2plan.interfaces.networkDesign.Net2PlanException;
031import com.net2plan.interfaces.networkDesign.NetPlan;
032import com.net2plan.interfaces.networkDesign.Route;
033import com.net2plan.interfaces.simulation.IEventProcessor;
034import com.net2plan.interfaces.simulation.SimEvent;
035import com.net2plan.libraries.NetworkPerformanceMetrics;
036import com.net2plan.utils.Constants.RoutingType;
037import com.net2plan.utils.InputParameter;
038import com.net2plan.utils.Pair;
039import com.net2plan.utils.TimeTrace;
040import com.net2plan.utils.Triple;
041
042/** 
043 * This module implements a distributed dual-gradient based algorithm, for adapting the demand injected traffic (congestion control) in the network, to maximize the network utility enforcing a fair allocation of the resources.
044 *
045 * Ths event processor is adapted to permit observing the algorithm performances under user-defined conditions, 
046 * including asynchronous distributed executions, where signaling can be affected by losses and/or delays, and/or measurement errors. 
047 * The time evolution of different metrics can be stored in output files, for later processing. 
048 * As an example, see the <a href="../../../../../../graphGeneratorFiles/fig_sec10_4_congestionControlDual.m">{@code fig_sec10_4_congestionControlDual.m}</a> MATLAB file used for generating the graph/s of the case study in the 
049 * <a href="http://eu.wiley.com/WileyCDA/WileyTitle/productCd-1119013356.html">book</a> using this algorithm.
050 * 
051 * To simulate a network with this module, use the {@code Online_evGen_doNothing} generator.
052 * 
053 * @net2plan.keywords Bandwidth assignment (BA), Distributed algorithm, Dual gradient algorithm
054 * @net2plan.ocnbooksections Section 10.4
055 * @net2plan.inputParameters 
056 * @author Pablo Pavon-Marino
057 */
058public class Online_evProc_congestionControlDual extends IEventProcessor 
059{
060        private Random rng;
061
062        private InputParameter signaling_isSynchronous = new InputParameter ("signaling_isSynchronous", false , "true if all the distributed agents involved wake up synchronously to send the signaling messages");
063        private InputParameter signaling_averageInterMessageTime = new InputParameter ("signaling_averageInterMessageTime", 1.0 , "Average time between two signaling messages sent by an agent" , 0 , false , Double.MAX_VALUE , true);
064        private InputParameter signaling_maxFluctuationInterMessageTime = new InputParameter ("signaling_maxFluctuationInterMessageTime", 0.5 , "Max fluctuation in time between two signaling messages sent by an agent" , 0 , true , Double.MAX_VALUE , true);
065        private InputParameter signaling_averageDelay = new InputParameter ("signaling_averageDelay", 0.0 , "Average time between signaling message transmission by an agent and its reception by other or others" , 0 , true , Double.MAX_VALUE , true);
066        private InputParameter signaling_maxFluctuationInDelay = new InputParameter ("signaling_maxFluctuationInDelay", 0.0 , "Max fluctuation in time in the signaling delay, in absolute time values. The signaling delays are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true);
067        private InputParameter signaling_signalingLossProbability = new InputParameter ("signaling_signalingLossProbability", 0.05 , "Probability that a signaling message transmitted is lost (not received by other or others involved agents)" , 0 , true , Double.MAX_VALUE , true);
068        private InputParameter update_isSynchronous = new InputParameter ("update_isSynchronous", false , "true if all the distributed agents involved wake up synchronousely to update its state");
069        private InputParameter update_averageInterUpdateTime = new InputParameter ("update_averageInterUpdateTime", 1.0 , "Average time between two updates of an agent" , 0 , false , Double.MAX_VALUE , true);
070        private InputParameter update_maxFluctuationInterUpdateTime = new InputParameter ("update_maxFluctuationInterUpdateTime", 0.5 , "Max fluctuation in time in the update interval of an agent, in absolute time values. The update intervals are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true);
071        private InputParameter gradient_gammaStep = new InputParameter ("gradient_gammaStep", 0.0001 , "Gamma step in the gradient algorithm" , 0 , false , Double.MAX_VALUE , true);
072        private InputParameter gradient_maxGradientAbsoluteNoise = new InputParameter ("gradient_maxGradientAbsoluteNoise", 0.0 , "Max value of the added noise to the gradient coordinate in absolute values" , 0 , true , Double.MAX_VALUE , true);
073
074        private InputParameter simulation_maxNumberOfUpdateIntervals = new InputParameter ("simulation_maxNumberOfUpdateIntervals", 700.0 , "Maximum number of update intervals in average per agent" , 0 , false , Double.MAX_VALUE , true);
075        private InputParameter simulation_randomSeed = new InputParameter ("simulation_randomSeed", (long) 1 , "Seed of the random number generator");
076        private InputParameter simulation_outFileNameRoot = new InputParameter ("simulation_outFileNameRoot", "congestionControlDual" , "Root of the file name to be used in the output files. If blank, no output");
077
078        private InputParameter control_minHd = new InputParameter ("control_minHd", 0.1 , "Minimum traffic assigned to each demand" , 0 , true , Double.MAX_VALUE , true);
079        private InputParameter control_maxHd = new InputParameter ("control_maxHd", 1.0E6 , "Maximum traffic assigned to each demand" , 0 , true , Double.MAX_VALUE , true);
080        private InputParameter control_fairnessFactor = new InputParameter ("control_fairnessFactor", 2.0 , "Fairness factor in utility function of congestion control" , 0 , true , Double.MAX_VALUE , true);
081        private InputParameter control_initialLinkPrices = new InputParameter ("control_initialLinkPrices", 1 , "Initial value of the link weights" , 0 , true , Double.MAX_VALUE , true);
082
083        private static final int SIGNALING_WAKEUPTOSENDMESSAGE = 400;
084        private static final int SIGNALING_RECEIVEDMESSAGE = 401;
085        private static final int UPDATE_WAKEUPTOUPDATE = 402;
086
087        private NetPlan currentNetPlan;
088        private int N,E,D;
089        private DoubleMatrix1D congControl_price_e;
090        private DoubleMatrix2D control_mostUpdatedLinkPriceKnownByDemand_de; 
091        
092        private TimeTrace stat_traceOf_hd;
093        private TimeTrace stat_traceOf_objFunction;
094        private TimeTrace stat_traceOf_pie;
095        private TimeTrace stat_traceOf_ye;
096
097        @Override
098        public String getDescription()
099        {
100                return "This module implements a distributed dual-gradient based algorithm, for adapting the demand injected traffic (congestion control) in the network, to maximize the network utility enforcing a fair allocation of the resources.";
101        }
102
103        @Override
104        public List<Triple<String, String, String>> getParameters()
105        {
106                /* Returns the parameter information for all the InputParameter objects defined in this object (uses Java reflection) */
107                return InputParameter.getInformationAllInputParameterFieldsOfObject(this);
108        }
109
110        @Override
111        public void initialize(NetPlan currentNetPlan, Map<String, String> algorithmParameters, Map<String, String> simulationParameters, Map<String, String> net2planParameters)
112        {
113                /* Initialize all InputParameter objects defined in this object (this uses Java reflection) */
114                InputParameter.initializeAllInputParameterFieldsOfObject(this, algorithmParameters);
115
116                this.currentNetPlan = currentNetPlan;
117                if (currentNetPlan.getNumberOfLayers() != 1) throw new Net2PlanException ("This algorithm works in single layer networks");
118
119                /* Remove all routes, and create one with the shortest path in km for each demand */
120                currentNetPlan.removeAllUnicastRoutingInformation();
121                currentNetPlan.setRoutingType(RoutingType.SOURCE_ROUTING);
122                currentNetPlan.addRoutesFromCandidatePathList(currentNetPlan.getVectorLinkLengthInKm().toArray()  , "K" , "1");
123                for (Route r : currentNetPlan.getRoutes ()) r.setCarriedTraffic(r.getDemand().getOfferedTraffic() , r.getDemand().getOfferedTraffic());
124                
125                this.rng = new Random (simulation_randomSeed.getLong());
126                this.currentNetPlan = currentNetPlan;
127                this.D = currentNetPlan.getNumberOfDemands ();
128                this.E = currentNetPlan.getNumberOfLinks ();
129                this.N = currentNetPlan.getNumberOfNodes ();
130
131                /* Set the initial prices in the links: 1.0 */
132                this.congControl_price_e = DoubleFactory1D.dense.make (E , control_initialLinkPrices.getDouble());
133                
134                /* Initialize the information each demand knows of the prices of all the links */
135                this.control_mostUpdatedLinkPriceKnownByDemand_de = DoubleFactory2D.dense.make (D,E,control_initialLinkPrices.getDouble());
136                
137                /* Compute the traffic each demand injects, update the routes keeping the fraction */
138                for (Demand d : currentNetPlan.getDemands())
139                {
140                        final double new_hd = this.computeHdFromPrices(d);
141                        if (Double.isNaN(new_hd)) throw new RuntimeException ("Bad");
142                        final double old_hd = d.getOfferedTraffic();
143                        d.setOfferedTraffic(new_hd);
144                        final Set<Route> routes = d.getRoutes();
145                        final double increasingFactor = (old_hd == 0)?  Double.MAX_VALUE : new_hd/old_hd;
146                        for (Route r : routes)
147                        {
148                                final double old_hr = r.getCarriedTraffic();
149                                final double new_hr = (old_hd == 0)? new_hd / routes.size() : old_hr * increasingFactor;
150                                if (Double.isNaN(old_hr)) throw new RuntimeException ("Bad");
151                                if (Double.isNaN(new_hr)) throw new RuntimeException ("Bad");
152                                r.setCarriedTraffic(new_hr , new_hr);
153                        }
154                        if (Math.abs(d.getOfferedTraffic() - d.getCarriedTraffic()) > 1E-3) throw new RuntimeException ("Bad");
155                }
156                
157                /* Initially all nodes receive a "wake up to transmit" event, aligned at time zero or y asynchr => randomly chosen */
158                for (Link e : currentNetPlan.getLinks())
159                {
160                        final double signalingTime = (signaling_isSynchronous.getBoolean())? signaling_averageInterMessageTime.getDouble() : Math.max(0 , signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5));
161                        this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , e));
162                }
163                for (Demand d : currentNetPlan.getDemands())
164                {
165                        final double updateTime = (update_isSynchronous.getBoolean())? update_averageInterUpdateTime.getDouble() : Math.max(0 , update_averageInterUpdateTime.getDouble() + update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5));
166                        this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_WAKEUPTOUPDATE , d));
167                }
168
169                /* Intialize the traces */
170                this.stat_traceOf_hd = new TimeTrace ();
171                this.stat_traceOf_pie = new TimeTrace  ();
172                this.stat_traceOf_ye = new TimeTrace  ();
173                this.stat_traceOf_objFunction = new TimeTrace (); 
174
175                this.stat_traceOf_hd.add(0.0, this.currentNetPlan.getVectorDemandOfferedTraffic());
176                this.stat_traceOf_pie.add(0.0, this.congControl_price_e.copy ());
177                this.stat_traceOf_ye.add(0.0, this.currentNetPlan.getVectorLinkTotalCarriedTraffic());
178                this.stat_traceOf_objFunction.add(0.0, NetworkPerformanceMetrics.alphaUtility(currentNetPlan.getVectorDemandOfferedTraffic() , control_fairnessFactor.getDouble()));
179
180        }
181
182        @Override
183        public void processEvent(NetPlan currentNetPlan, SimEvent event)
184        {
185                final double t = event.getEventTime();
186                switch (event.getEventType())
187                {
188                case SIGNALING_RECEIVEDMESSAGE: // A node receives from an out neighbor the q_nt for any destination
189                {
190                        final Pair<Demand,Pair<Link,Double>> signalInfo = (Pair<Demand,Pair<Link,Double>>) event.getEventObject();
191                        final Demand d = signalInfo.getFirst();
192                        final Pair<Link,Double> receivedInfo_price_e = signalInfo.getSecond();
193                        this.control_mostUpdatedLinkPriceKnownByDemand_de.set(d.getIndex() , receivedInfo_price_e.getFirst().getIndex () ,receivedInfo_price_e.getSecond());  
194                        break;
195                }
196                
197                case SIGNALING_WAKEUPTOSENDMESSAGE: 
198                {
199                        final Link eMe = (Link) event.getEventObject();
200
201                        /* Update the new price with the gradient approach */
202                        final double u_e = eMe.getCapacity();
203                        final double y_e = eMe.getCarriedTrafficIncludingProtectionSegments();
204                        final double old_pie = this.congControl_price_e.get(eMe.getIndex());
205                        final double new_pie = Math.max(0, old_pie - this.gradient_gammaStep.getDouble() * (u_e - y_e) + 2*gradient_maxGradientAbsoluteNoise.getDouble()*(rng.nextDouble()-0.5));
206                        this.congControl_price_e.set(eMe.getIndex(), new_pie);
207                        
208                        /* Create the info I will signal */
209                        Pair<Link,Double> infoToSignal = Pair.of(eMe ,  new_pie);
210
211                        /* Send the events of the signaling information messages to all the nodes */
212                        for (Route route : eMe.getTraversingRoutes())
213                        {
214                                if (rng.nextDouble() < this.signaling_signalingLossProbability.getDouble()) continue; // the signaling may be lost => lost to all demands
215                                final Demand d = route.getDemand();
216                                final double signalingReceptionTime = t + Math.max(0 , signaling_averageDelay.getDouble() + signaling_maxFluctuationInDelay.getDouble() * (rng.nextDouble() - 0.5));
217                                this.scheduleEvent(new SimEvent (signalingReceptionTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_RECEIVEDMESSAGE , Pair.of(d , infoToSignal)));
218                        }
219                        
220                        /* Re-schedule when to wake up again */
221                        final double signalingTime = signaling_isSynchronous.getBoolean()? t + signaling_averageInterMessageTime.getDouble() : Math.max(t , t + signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5));
222                        this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , eMe));
223                        break;
224                }
225
226                case UPDATE_WAKEUPTOUPDATE: // a node updates its p_n, p_e values, using most updated info available
227                {
228                        final Demand dMe = (Demand) event.getEventObject();
229                        
230                        /* compute the new h_d and apply it */
231                        final double new_hd = computeHdFromPrices (dMe);
232                        final double old_hd = dMe.getCarriedTraffic();
233                        dMe.setOfferedTraffic(new_hd);
234                        final Set<Route> routes = dMe.getRoutes();
235                        final double increasingFactor = (old_hd == 0)?  Double.MAX_VALUE : new_hd/old_hd;
236                        for (Route r : routes)
237                        {
238                                final double old_hr = r.getCarriedTraffic();
239                                final double new_hr = (old_hd == 0)? new_hd / routes.size() : old_hr * increasingFactor;
240                                r.setCarriedTraffic(new_hr , new_hr);
241                        }
242//                      if (Math.abs(currentNetPlan.getDemandOfferedTraffic(dIdMe) - currentNetPlan.getDemandCarriedTraffic(dIdMe)) > 1E-3) throw new RuntimeException ("Bad");
243
244                        final double updateTime = update_isSynchronous.getBoolean()? t + update_averageInterUpdateTime.getDouble() : Math.max(t , t + update_averageInterUpdateTime.getDouble() + update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5));
245                        this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_WAKEUPTOUPDATE,  dMe));
246
247                        this.stat_traceOf_hd.add(t, this.currentNetPlan.getVectorDemandOfferedTraffic());
248                        this.stat_traceOf_pie.add(t, this.congControl_price_e.copy ());
249                        this.stat_traceOf_ye.add(t, this.currentNetPlan.getVectorLinkTotalCarriedTraffic());
250                        this.stat_traceOf_objFunction.add(t, NetworkPerformanceMetrics.alphaUtility(currentNetPlan.getVectorDemandOfferedTraffic() , control_fairnessFactor.getDouble()));
251
252                        if (t > this.simulation_maxNumberOfUpdateIntervals.getDouble() * this.update_averageInterUpdateTime.getDouble()) { this.endSimulation (); }
253                        
254                        break;
255                }
256                        
257
258                default: throw new RuntimeException ("Unexpected received event");
259                }
260                
261                
262        }
263
264        public String finish (StringBuilder st , double simTime)
265        {
266                if (simulation_outFileNameRoot.getString().equals("")) return null;
267                stat_traceOf_hd.printToFile(new File (simulation_outFileNameRoot.getString() + "_hd.txt"));
268                stat_traceOf_pie.printToFile(new File (simulation_outFileNameRoot.getString() + "_pie.txt"));
269                stat_traceOf_ye.printToFile(new File (simulation_outFileNameRoot.getString() + "_ye.txt"));
270                stat_traceOf_objFunction.printToFile(new File (simulation_outFileNameRoot.getString() + "_objFunc.txt"));
271                Triple<DoubleMatrix1D,DoubleMatrix1D,Double> pair = computeOptimumSolution ();
272                DoubleMatrix1D h_d_opt = pair.getFirst();
273                DoubleMatrix1D pi_e = pair.getSecond();
274                double optCost = pair.getThird();  
275                TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_objFunc.txt"), optCost);
276                TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_hd.txt"), h_d_opt);
277                TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_pie.txt"), pi_e);
278                return null;
279        }
280        
281        private double computeHdFromPrices (Demand d)
282        {
283                DoubleMatrix1D infoIKnow_price_e = this.control_mostUpdatedLinkPriceKnownByDemand_de.viewRow (d.getIndex ());
284
285                /* compute the demand price as weighted sum in the routes of route prices  */
286                double demandWeightedSumLinkPrices = 0;
287                double demandCarriedTraffic = 0; 
288                for (Route r : d.getRoutes ())
289                {
290                        final double h_r = r.getCarriedTraffic();
291                        demandCarriedTraffic += h_r;
292                        for (Link e : r.getSeqLinksRealPath())
293                                demandWeightedSumLinkPrices += h_r * infoIKnow_price_e.get(e.getIndex ());
294                }
295                demandWeightedSumLinkPrices /= demandCarriedTraffic;
296
297                /* compute the new h_d */
298                final double new_hd = Math.max(this.control_minHd.getDouble() , Math.min(this.control_maxHd.getDouble(), Math.pow(demandWeightedSumLinkPrices, -1/this.control_fairnessFactor.getDouble())));
299                return new_hd;
300        }
301
302        private Triple<DoubleMatrix1D,DoubleMatrix1D,Double> computeOptimumSolution ()
303        {
304                /* Modify the map so that it is the pojection where all elements sum h_d, and are non-negative */
305                final int D = this.currentNetPlan.getNumberOfDemands();
306
307                OptimizationProblem op = new OptimizationProblem();
308
309                /* Add the decision variables to the problem */
310                op.addDecisionVariable("h_d", false, new int[] {1, D}, control_minHd.getDouble(), control_maxHd.getDouble());
311
312                /* Set some input parameters */
313                op.setInputParameter("u_e", currentNetPlan.getVectorLinkCapacity() , "row");
314                op.setInputParameter("alpha", control_fairnessFactor.getDouble());
315                op.setInputParameter("R_de", currentNetPlan.getMatrixDemand2LinkAssignment());
316
317                /* Sets the objective function */
318                if (control_fairnessFactor.getDouble() == 1)
319                    op.setObjectiveFunction("maximize", "sum(ln(h_d))");
320                else if (control_fairnessFactor.getDouble() == 0)
321                    op.setObjectiveFunction("maximize", "sum(h_d)");
322                else
323                    op.setObjectiveFunction("maximize", "(1-alpha) * sum(h_d ^ (1-alpha))");
324
325                op.addConstraint("h_d * R_de <= u_e" , "pi_e"); // the capacity constraints (E constraints)
326
327                /* Call the solver to solve the problem */
328                op.solve("ipopt");
329
330                /* If an optimal solution was not found, quit */
331                if (!op.solutionIsOptimal()) throw new Net2PlanException("An optimal solution was not found");
332
333                /* Retrieve the optimum solutions */
334                DoubleMatrix1D h_d = op.getPrimalSolution("h_d").view1D ();
335                DoubleMatrix1D pi_e = op.getMultipliersOfConstraint("pi_e").assign(DoubleFunctions.abs).view1D ();
336                
337                return Triple.of(h_d,pi_e,NetworkPerformanceMetrics.alphaUtility(currentNetPlan.getVectorDemandOfferedTraffic() , control_fairnessFactor.getDouble()));
338        }
339        
340}