001/*******************************************************************************
002 * Copyright (c) 2016 Pablo Pavon Mariņo.
003 * All rights reserved. This program and the accompanying materials
004 * are made available under the terms of the GNU Lesser Public License v2.1
005 * which accompanies this distribution, and is available at
006 * http://www.gnu.org/licenses/lgpl.html
007 ******************************************************************************/
008package com.net2plan.examples.ocnbook.onlineSim;
009
010
011
012
013import java.io.File;
014import java.io.FileOutputStream;
015import java.io.PrintStream;
016import java.util.Arrays;
017import java.util.HashMap;
018import java.util.LinkedList;
019import java.util.List;
020import java.util.Map;
021import java.util.Random;
022
023import cern.colt.matrix.tdouble.DoubleMatrix1D;
024import cern.colt.matrix.tdouble.DoubleMatrix2D;
025import cern.colt.matrix.tint.IntFactory3D;
026import cern.colt.matrix.tint.IntMatrix3D;
027import cern.jet.math.tdouble.DoubleFunctions;
028import cern.jet.random.tdouble.Poisson;
029
030import com.jom.OptimizationProblem;
031import com.net2plan.interfaces.networkDesign.Demand;
032import com.net2plan.interfaces.networkDesign.Link;
033import com.net2plan.interfaces.networkDesign.Net2PlanException;
034import com.net2plan.interfaces.networkDesign.NetPlan;
035import com.net2plan.interfaces.networkDesign.Node;
036import com.net2plan.interfaces.networkDesign.Route;
037import com.net2plan.interfaces.simulation.IEventProcessor;
038import com.net2plan.interfaces.simulation.SimEvent;
039import com.net2plan.utils.Constants.RoutingType;
040import com.net2plan.utils.DoubleUtils;
041import com.net2plan.utils.InputParameter;
042import com.net2plan.utils.IntUtils;
043import com.net2plan.utils.Pair;
044import com.net2plan.utils.TimeTrace;
045import com.net2plan.utils.Triple;
046
047/** 
048 * This module implements a distributed dual-decomposition-based gradient algorithm, for a coordinated adjustment of the traffic to inject by each demand (congestion control), and the routing (backpressure based) of this traffic in the network, to maximize the network utility enforcing a fair allocation of the resources.
049 *
050 * Ths event processor is adapted to permit observing the algorithm performances under user-defined conditions, 
051 * including asynchronous distributed executions, where signaling can be affected by losses and/or delays, and/or measurement errors. 
052 * The time evolution of different metrics can be stored in output files, for later processing. 
053 * As an example, see the <a href="../../../../../../graphGeneratorFiles/fig_sec11_4_congControlAndBackpressureRouting_dualDecomp.m">{@code fig_sec11_4_congControlAndBackpressureRouting_dualDecomp.m}</a> MATLAB file used for generating the graph/s of the case study in the 
054 * <a href="http://eu.wiley.com/WileyCDA/WileyTitle/productCd-1119013356.html">book</a> using this algorithm.
055 * 
056 * To simulate a network with this module, use the {@code Online_evGen_doNothing} generator.
057 * 
058 * @net2plan.keywords Bandwidth assignment (BA), Flow assignment (FA), Backpressure routing, Distributed algorithm, Dual decomposition 
059 * @net2plan.ocnbooksections Section 11.4
060 * @net2plan.inputParameters 
061 * @author Pablo Pavon-Marino
062 */
063public class Online_evProc_congControlAndBackpressureRoutingDualDecomp extends IEventProcessor 
064{
065        private static PrintStream getNulFile () { try { return new PrintStream (new FileOutputStream ("NUL") , false); } catch (Exception e) {e.printStackTrace(); throw new RuntimeException ("Not NUL file"); }   } 
066        private PrintStream log = getNulFile (); //System.err;//new PrintStream (new FileOutputStream ("NUL") , true);
067
068        /******************** MAC ****************/
069        static final int CC_UPDATE_WAKEUPTOUPDATE = 402;
070
071        private InputParameter cc_update_isSynchronous = new InputParameter ("cc_update_isSynchronous", false , "true if all the distributed agents involved wake up synchronousely to update its state");
072        private InputParameter cc_update_averageInterUpdateTime = new InputParameter ("cc_update_averageInterUpdateTime", 10.0 , "Average time between two updates of an agent" , 0 , false , Double.MAX_VALUE , true);
073        private InputParameter cc_update_maxFluctuationInterUpdateTime = new InputParameter ("cc_update_maxFluctuationInterUpdateTime", 5.0 , "Max fluctuation in time in the update interval of an agent, in absolute time values. The update intervals are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true);
074
075        private InputParameter cc_simulation_maxNumberOfUpdateIntervals = new InputParameter ("cc_simulation_maxNumberOfUpdateIntervals", 1000.0 , "Maximum number of update intervals in average per agent" , 0 , false , Double.MAX_VALUE , true);
076        private InputParameter cc_control_minHd = new InputParameter ("cc_control_minHd", 0.1 , "Minimum traffic assigned to each demand" , 0 , true , Double.MAX_VALUE , true);
077        private InputParameter cc_control_maxHd = new InputParameter ("cc_control_maxHd", 25 , "Maximum traffic assigned to each demand" , 0 , true , Double.MAX_VALUE , true);
078        private InputParameter cc_control_fairnessFactor = new InputParameter ("cc_control_fairnessFactor_1", 2.0 , "Fairness factor in utility function of congestion control for demands" , 0 , true , Double.MAX_VALUE , true);
079        
080
081        private InputParameter simulation_randomSeed = new InputParameter ("simulation_randomSeed", (long) 1 , "Seed of the random number generator");
082        private InputParameter simulation_outFileNameRoot = new InputParameter ("simulation_outFileNameRoot", "crossLayerCongControlBackpressureRoutingDualDecomp" , "Root of the file name to be used in the output files. If blank, no output");
083
084        private InputParameter signaling_isSynchronous = new InputParameter ("signaling_isSynchronous", false , "true if all the distributed agents involved wake up synchronously to send the signaling messages");
085        private InputParameter signaling_averageInterMessageTime = new InputParameter ("signaling_averageInterMessageTime", 1.0 , "Average time between two signaling messages sent by an agent" , 0 , false , Double.MAX_VALUE , true);
086        private InputParameter signaling_maxFluctuationInterMessageTime = new InputParameter ("signaling_maxFluctuationInterMessageTime", 0.5 , "Max fluctuation in time between two signaling messages sent by an agent" , 0 , true , Double.MAX_VALUE , true);
087        private InputParameter signaling_averageDelay = new InputParameter ("signaling_averageDelay", 0.0 , "Average time between signaling message transmission by an agent and its reception by other or others" , 0 , true , Double.MAX_VALUE , true);
088        private InputParameter signaling_maxFluctuationInDelay = new InputParameter ("signaling_maxFluctuationInDelay", 0.0 , "Max fluctuation in time in the signaling delay, in absolute time values. The signaling delays are sampled from a uniform distribution within the given interval" , 0 , true , Double.MAX_VALUE , true);
089        private InputParameter signaling_signalingLossProbability = new InputParameter ("signaling_signalingLossProbability", 0.05 , "Probability that a signaling message transmitted is lost (not received by other or others involved agents)" , 0 , true , Double.MAX_VALUE , true);
090
091        private InputParameter routing_fixedPacketDurationAndSchedulingInterval = new InputParameter ("routing_fixedPacketDurationAndSchedulingInterval", 1.0 , "Fixed slot size (synchronized all links)" , 0 , false , Double.MAX_VALUE , true);
092        private InputParameter routing_numTrafficUnitsOfOnePacket = new InputParameter ("routing_numTrafficUnitsOfOnePacket", 1.0 , "One packet in one slot time is this traffic" , 0 , false , Double.MAX_VALUE , true);
093        private InputParameter routing_statNumSchedSlotBetweenN2PRecomputing = new InputParameter ("routing_statNumSchedSlotBetweenN2PRecomputing", (int) 100 , "Number of scheduling slots between two N2P update and storing a stat trace" , 1 , Integer.MAX_VALUE);
094        private InputParameter routing_maxNumberOfN2PStatisticsIntervals = new InputParameter ("routing_maxNumberOfN2PStatisticsIntervals", (int) 15 , "Simulation length in number of scheduling intervals" , 1 , Integer.MAX_VALUE);
095
096        private InputParameter routing_gradient_gammaStep = new InputParameter ("routing_gradient_gammaStep", 0.000005 , "Gamma step in the gradient algorithm" , 0 , false , Double.MAX_VALUE , true);
097        
098        /* Optimum solution */
099        private Pair<NetPlan,double[][]> optNetPlan;
100        
101        /******************* ROUTING ****/
102        
103        private Random rng;
104        int N,E,D;
105        
106        static final int SIGNALING_WAKEUPTOSENDMESSAGE = 300; // A NODE SIGNAL QUEUES TO NEIGHBOR NODES
107        static final int SIGNALING_RECEIVEDMESSAGE = 301;  // A NODE RECEIVED SIGNAL QUEUES TO NEIGHBOR NODES
108        static final int UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS = 302; // ALL NODES SCHEDULE ONE PACKET PER LINK
109        static final int UPDATE_STATISTICTRACES = 303; // STAT UPDATE OF N2P
110
111
112        private NetPlan currentNetPlan;
113
114        private Map<List<Link>,Route> stat_mapSeqLinks2RouteId;
115        private Map<Route,Integer> stat_mapRouteId2CarriedPacketsLastInterval;
116        private int [] stat_accumNumGeneratedPackets_d;
117        private int [] stat_accumNumReceivedPackets_d;
118        private int [][] stat_accumNumQeueusPackets_nd;
119        
120//      private TimeTrace traceOf_objFunction;
121//      private TimeTrace traceOf_ye;
122        private TimeTrace stat_traceOf_hd;
123        private TimeTrace stat_traceOf_xp; // traces are taken from netPlan, when it is updated (stat_n2pRecomputingInterval)
124        private TimeTrace stat_traceOf_objFunction;
125        private TimeTrace stat_traceOf_ye;
126        private TimeTrace stat_traceOf_queueSizes;
127
128        
129        private IntMatrix3D ctlMostUpdatedQndKnownByNodeN1_n1nd; // sparse for memory efficiency 
130        private int [][] ctlNumPacketsQueue_nd;
131        private Map<Pair<Node,Demand>,LinkedList<PacketInfo>> ctlQueue_nd; 
132
133        @Override
134        public String getDescription()
135        {
136                return "This module implements a distributed dual-decomposition-based gradient algorithm, for a coordinated adjustment of the traffic to inject by each demand (congestion control), and the routing (backpressure based) of this traffic in the network, to maximize the network utility enforcing a fair allocation of the resources.";
137        }
138
139        @Override
140        public List<Triple<String, String, String>> getParameters()
141        {
142                /* Returns the parameter information for all the InputParameter objects defined in this object (uses Java reflection) */
143                return InputParameter.getInformationAllInputParameterFieldsOfObject(this);
144        }
145
146        @Override
147        public void initialize(NetPlan currentNetPlan, Map<String, String> algorithmParameters, Map<String, String> simulationParameters, Map<String, String> net2planParameters)
148        {
149                /* Initialize all InputParameter objects defined in this object (this uses Java reflection) */
150                InputParameter.initializeAllInputParameterFieldsOfObject(this, algorithmParameters);
151
152                this.currentNetPlan = currentNetPlan;
153                if (currentNetPlan.getNumberOfLayers() != 1) throw new Net2PlanException ("This algorithm works in single layer networks");
154
155                /* Update the indexes. The set of nodes, links and demands cannot change during the algorithm */
156                this.N = this.currentNetPlan.getNumberOfNodes();
157                this.E = this.currentNetPlan.getNumberOfLinks();
158                this.D = this.currentNetPlan.getNumberOfDemands();
159                this.rng = new Random (simulation_randomSeed.getLong());
160                if ((E == 0) || (D == 0)) throw new Net2PlanException ("The input design should have links and demands");
161                
162                /* Sets the initial routing, with all the traffic balanced equally in all the paths of the demand */
163                this.stat_mapSeqLinks2RouteId = new HashMap<List<Link>,Route> ();
164                this.stat_mapRouteId2CarriedPacketsLastInterval = new HashMap<Route,Integer> ();
165
166                /* Computes the optimum NetPlan */
167                this.currentNetPlan.removeAllUnicastRoutingInformation();
168                this.currentNetPlan.setRoutingType(RoutingType.SOURCE_ROUTING);
169
170                this.currentNetPlan.removeAllRoutes();
171                this.optNetPlan = computeOptimumSolution ();
172                /* Add the routes in the optimum solution to the netPlan, with zero traffic. For having teh same IDs later */
173                this.currentNetPlan.copyFrom(optNetPlan.getFirst());
174                for (Route r: currentNetPlan.getRoutes())
175                {
176                        r.setCarriedTraffic(0.0 , 0.0);
177                        this.stat_mapSeqLinks2RouteId.put(r.getSeqLinksRealPath(),r);
178                        this.stat_mapRouteId2CarriedPacketsLastInterval.put(r, 0);
179                }
180                
181
182                this.ctlQueue_nd = new HashMap<Pair<Node,Demand>,LinkedList<PacketInfo>> ();
183                for (Node n: this.currentNetPlan.getNodes())
184                        for (Demand d: this.currentNetPlan.getDemands())
185                                { ctlQueue_nd.put(Pair.of(n, d), new LinkedList<PacketInfo> ());  }
186
187                this.ctlMostUpdatedQndKnownByNodeN1_n1nd = IntFactory3D.sparse.make(N, N, D); 
188                this.ctlNumPacketsQueue_nd = new int [N][D];
189                
190                this.stat_accumNumGeneratedPackets_d = new int [D];
191                this.stat_accumNumReceivedPackets_d = new int [D];
192                this.stat_accumNumQeueusPackets_nd= new int [N][D]; 
193                
194                /* Initially all nodes receive a "wake up to transmit" event, aligned at time zero or y asynchr => randomly chosen */
195                this.scheduleEvent(new SimEvent (routing_statNumSchedSlotBetweenN2PRecomputing.getInt() * routing_fixedPacketDurationAndSchedulingInterval.getDouble(), SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_STATISTICTRACES , -1));
196                this.scheduleEvent(new SimEvent (routing_fixedPacketDurationAndSchedulingInterval.getDouble() , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS , -1));
197                for (Node n : currentNetPlan.getNodes())
198                {
199                        final double signalingTime = signaling_isSynchronous.getBoolean()? signaling_averageInterMessageTime.getDouble() : Math.max(0 , signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5));
200                        this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , n));
201                }
202
203                /* Intialize the traces */
204                this.stat_traceOf_xp = new TimeTrace ();
205                this.stat_traceOf_ye = new TimeTrace ();
206                this.stat_traceOf_queueSizes = new TimeTrace ();
207                this.stat_traceOf_objFunction = new TimeTrace (); 
208                this.stat_traceOf_hd = new TimeTrace ();
209                this.stat_traceOf_xp.add(0.0 , netPlanRouteCarriedTrafficMap (this.currentNetPlan));
210                this.stat_traceOf_ye.add(0.0, currentNetPlan.getVectorLinkTotalCarriedTraffic());
211                this.stat_traceOf_queueSizes.add(0.0, copyOf(this.ctlNumPacketsQueue_nd));
212                this.stat_traceOf_objFunction.add(0.0, computeObjectiveFunctionFromNetPlan(this.currentNetPlan));
213                this.stat_traceOf_hd.add(0.0, currentNetPlan.getVectorDemandOfferedTraffic());
214                
215                /* CONG CONTROL */
216                /* Set the initial prices in the links: 1.0 */
217                /* Initialize the information each demand knows of the prices of all the links */
218                /* Compute the traffic each demand injects, set it in net2plan, and tell routing layer there was an update in this */
219                for (Demand d : currentNetPlan.getDemands())
220                        d.setOfferedTraffic(this.computeHdFromPrices(d));
221                
222                /* Initially all nodes receive a "wake up to transmit" event, aligned at time zero or y asynchr => randomly chosen */
223                for (Demand d : currentNetPlan.getDemands())
224                {
225                        final double updateTime = (cc_update_isSynchronous.getBoolean())? cc_update_averageInterUpdateTime.getDouble() : Math.max(0 , cc_update_averageInterUpdateTime.getDouble() + cc_update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5));
226                        this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , CC_UPDATE_WAKEUPTOUPDATE , d));
227                }
228        }
229
230        @Override
231        public void processEvent(NetPlan currentNetPlan, SimEvent event)
232        {
233                final double time = event.getEventTime();
234                switch (event.getEventType())
235                {
236                case UPDATE_STATISTICTRACES:
237                {
238                        for (Route r : this.currentNetPlan.getRoutes())
239                        {
240                                final int numPacketsArrivedDestinationLastInterval = stat_mapRouteId2CarriedPacketsLastInterval.get(r);
241                                final double trafficArrivedDestinationLastInterval = numPacketsArrivedDestinationLastInterval * routing_numTrafficUnitsOfOnePacket.getDouble() / (routing_fixedPacketDurationAndSchedulingInterval.getDouble() * routing_statNumSchedSlotBetweenN2PRecomputing.getInt ());
242                                r.setCarriedTraffic(trafficArrivedDestinationLastInterval , trafficArrivedDestinationLastInterval);
243                                /* Set zero in the counter of packets */
244                                stat_mapRouteId2CarriedPacketsLastInterval.put(r,0);
245                        }
246                        
247                        /* Update the traces */
248                        this.stat_traceOf_xp.add(time , netPlanRouteCarriedTrafficMap (this.currentNetPlan));
249                        this.stat_traceOf_ye.add(time, currentNetPlan.getVectorLinkTotalCarriedTraffic());
250                        this.stat_traceOf_objFunction.add(time, computeObjectiveFunctionFromNetPlan(this.currentNetPlan));
251                        final double scaleFactorAccumNumQueuePacketsToAverageQueuedTraffic = this.routing_numTrafficUnitsOfOnePacket.getDouble() / this.routing_statNumSchedSlotBetweenN2PRecomputing.getInt ();
252                        /* We store the average queue sizes in traffic units */
253                        this.stat_traceOf_queueSizes.add(time, scaledCopyOf(this.stat_accumNumQeueusPackets_nd , scaleFactorAccumNumQueuePacketsToAverageQueuedTraffic));
254                        this.stat_accumNumQeueusPackets_nd = new int [N][D]; // reset the count
255                        this.scheduleEvent(new SimEvent (time + routing_fixedPacketDurationAndSchedulingInterval.getDouble() * routing_statNumSchedSlotBetweenN2PRecomputing.getInt() , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_STATISTICTRACES , -1));
256                        break;
257                }
258                
259                case SIGNALING_RECEIVEDMESSAGE: // A node receives from an out neighbor the q_nd for any demand
260                {
261                        final Triple<Node,Node,int []> signalInfo = (Triple<Node,Node,int []>) event.getEventObject();
262                        final Node nMe = signalInfo.getFirst();
263                        final int index_nIdMe = nMe.getIndex ();
264                        final Node nNeighbor = signalInfo.getSecond();
265                        final int index_nNeighbor = nNeighbor.getIndex ();
266                        final int [] receivedInfo_q_d = signalInfo.getThird();
267                        for (int index_d = 0 ; index_d < D ; index_d ++)
268                                ctlMostUpdatedQndKnownByNodeN1_n1nd.set(index_nIdMe, index_nNeighbor, index_d, receivedInfo_q_d[index_d]);
269                        break;
270
271                }
272                
273                case SIGNALING_WAKEUPTOSENDMESSAGE: // A node broadcasts signaling info to its 1 hop neighbors
274                {
275                        final Node nMe = (Node) event.getEventObject();
276                        final int index_nMe = nMe.getIndex ();
277                        
278                        log.println("t=" + time + ": ROUTING_NODEWAKEUPTO_TRANSMITSIGNAL node " + nMe);
279
280                        /* Create the info I will signal */
281                        int [] infoToSignal_q_d = Arrays.copyOf(ctlNumPacketsQueue_nd [index_nMe], D);
282
283                        /* Send the events of the signaling information messages to incoming neighbors */
284                        for (Node n : nMe.getInNeighbors())
285                        {
286                                if (rng.nextDouble() >= this.signaling_signalingLossProbability.getDouble()) // the signaling may be lost => lost to all nodes
287                                {
288                                        final double signalingReceptionTime = time + Math.max(0 , signaling_averageDelay.getDouble() + signaling_maxFluctuationInDelay.getDouble() * (rng.nextDouble() - 0.5));
289                                        this.scheduleEvent(new SimEvent (signalingReceptionTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_RECEIVEDMESSAGE , Triple.of(n , nMe , infoToSignal_q_d)));
290                                }
291                        }
292                        
293                        /* Re-schedule when to wake up again */ 
294                        final double signalingTime = signaling_isSynchronous.getBoolean()? time + signaling_averageInterMessageTime.getDouble() : Math.max(time , time + signaling_averageInterMessageTime.getDouble() + signaling_maxFluctuationInterMessageTime.getDouble() * (rng.nextDouble() - 0.5));
295                        this.scheduleEvent(new SimEvent (signalingTime , SimEvent.DestinationModule.EVENT_PROCESSOR , SIGNALING_WAKEUPTOSENDMESSAGE , nMe));
296                        break;
297                }
298
299                case UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS: // a node updates its p_n, p_e values, using most updated info available
300                {
301                        log.println("t=" + time + ": UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS");
302
303                        /* First, create traffic of the demands. A constant amount */
304                        for (Demand d : this.currentNetPlan.getDemands())
305                        {
306                                final double avgNumPackets_d = d.getOfferedTraffic() / routing_numTrafficUnitsOfOnePacket.getDouble();
307                                final int numPackets_d = Poisson.staticNextInt(avgNumPackets_d);
308                                final Node a_d = d.getIngressNode();
309                                LinkedList<PacketInfo> q = this.ctlQueue_nd.get(Pair.of(a_d, d));
310                                for (int cont = 0 ; cont < numPackets_d ; cont ++)
311                                        q.add (new PacketInfo (d , time));
312                                this.ctlNumPacketsQueue_nd [a_d.getIndex ()][d.getIndex ()] += numPackets_d;
313                                this.stat_accumNumGeneratedPackets_d [d.getIndex ()] += numPackets_d;
314                        }
315
316                        /* Forwarding decisions for each link. Each link can transmit as many packets as its capacity divided by trafic per packet factor (floor), during this scheduling */
317                        int [][] numPacketsToForward_ed = new int [E][D];
318                        int [][] numPacketsToForward_nd = new int [N][D];
319                        for (int index_e = 0 ; index_e  < E ; index_e ++)
320                        {
321                                final Link e = currentNetPlan.getLink (index_e);
322                                final int maxNumPacketsToTransmit = (int) Math.floor(e.getCapacity() / this.routing_numTrafficUnitsOfOnePacket.getDouble());
323                                final Node a_e = e.getOriginNode();
324                                final Node b_e = e.getDestinationNode();
325                                final int index_ae = a_e.getIndex ();
326                                final int index_be = b_e.getIndex ();
327                                for (int cont = 0 ; cont < maxNumPacketsToTransmit ; cont ++)
328                                {
329                                        /* Take the queue with higher */
330                                        double highestBackpressure = -1;
331                                        int indexDemandToTransmitPacket = -1;
332                                        for (int index_d = 0 ; index_d < D ; index_d ++)
333                                        {
334                                                final Demand d = currentNetPlan.getDemand (index_d);
335                                                final int thisQueueSize = ctlNumPacketsQueue_nd [index_ae][index_d];
336                                                if (thisQueueSize != ctlQueue_nd.get(Pair.of(a_e, d)).size ()) throw new RuntimeException ("Bad"); 
337                                                final int neighborQueueSize = ctlMostUpdatedQndKnownByNodeN1_n1nd.get(index_ae,index_be,index_d);
338                                                final double backpressure = routing_gradient_gammaStep.getDouble() * (thisQueueSize - numPacketsToForward_nd[index_ae][index_d] - neighborQueueSize);
339                                                if (backpressure > highestBackpressure) { indexDemandToTransmitPacket = index_d; highestBackpressure = backpressure; } 
340                                        }
341                                        if (highestBackpressure <= 0) break; // no more queues will be empty, not enough backpressures
342                                        
343                                        numPacketsToForward_ed[index_e][indexDemandToTransmitPacket] ++;
344                                        numPacketsToForward_nd[index_ae][indexDemandToTransmitPacket] ++;
345                                }
346                        }
347
348                        /* Apply the forwarding decisions */
349                        for (int index_e = 0 ; index_e  < E ; index_e ++)
350                        {
351                                final Link e = currentNetPlan.getLink (index_e);
352                                final Node a_e = e.getOriginNode();
353                                final Node b_e = e.getDestinationNode();
354                                final int index_ae = a_e.getIndex ();
355                                final int index_be = b_e.getIndex ();
356                                for (int index_d = 0 ; index_d < D ; index_d ++)
357                                {
358                                        final Demand d = currentNetPlan.getDemand (index_d);
359                                        final int numPackets_ed = numPacketsToForward_ed [index_e][index_d];
360
361                                        for (int cont = 0 ; cont < numPackets_ed ; cont ++)
362                                        {
363                                                //System.out.println("Pop e: " + e + ", d: " + d + ", numpack quque: " + this.ctlQueue_nd.get(Pair.of(a_e,d)).size() );
364
365                                                PacketInfo p = this.ctlQueue_nd.get(Pair.of(a_e,d)).pop();
366                                                ctlNumPacketsQueue_nd [index_ae][index_d] --;
367                                                if (this.ctlQueue_nd.get(Pair.of(a_e,d)).size() != ctlNumPacketsQueue_nd [index_ae][index_d]) throw new RuntimeException ("Bad");
368                                                p.seqLinks.add(e); // update the seq of links
369                                                if (b_e == d.getEgressNode())
370                                                {
371                                                        /* Packet arrived to destination: update the statistics */
372                                                        Route route = stat_mapSeqLinks2RouteId.get(p.seqLinks);
373                                                        
374                                                        /* If the route does not exist, create it */
375                                                        if (route == null) 
376                                                        {
377                                                                route = this.currentNetPlan.addRoute(d, 0.0 , 0.0 , p.seqLinks, null);
378                                                                stat_mapSeqLinks2RouteId.put(p.seqLinks , route);
379                                                                stat_mapRouteId2CarriedPacketsLastInterval.put(route, 1);
380                                                        }
381                                                        else
382                                                        {
383                                                                stat_mapRouteId2CarriedPacketsLastInterval.put(route, stat_mapRouteId2CarriedPacketsLastInterval.get(route) + 1);
384                                                                this.stat_accumNumReceivedPackets_d [index_d] ++;
385                                                        }
386                                                }
387                                                else
388                                                {
389                                                        /* Not the end node => put it in the next queue */
390                                                        this.ctlQueue_nd.get(Pair.of(b_e,d)).addLast(p);
391                                                        ctlNumPacketsQueue_nd [index_be][index_d] ++;
392                                                }
393                                        }
394                                }
395                                
396                        }
397                        
398                        /* Update the statistics of accum number of packets in a queue */
399                        for (int index_n = 0 ; index_n < N ; index_n ++)
400                                for (int index_d = 0 ; index_d < D ; index_d ++)
401                                        stat_accumNumQeueusPackets_nd [index_n][index_d] += this.ctlNumPacketsQueue_nd [index_n][index_d];
402                        
403                        this.scheduleEvent(new SimEvent (time + routing_fixedPacketDurationAndSchedulingInterval.getDouble() , SimEvent.DestinationModule.EVENT_PROCESSOR , UPDATE_INITTIMESLOTFORSCHEDULINGPACKETS , -1));
404                        
405                        if (time > this.routing_maxNumberOfN2PStatisticsIntervals.getInt() * routing_statNumSchedSlotBetweenN2PRecomputing.getInt() * this.routing_fixedPacketDurationAndSchedulingInterval.getDouble()) { this.endSimulation (); }
406
407                        break;
408                }
409                        
410                case CC_UPDATE_WAKEUPTOUPDATE: // a node updates its p_n, p_e values, using most updated info available
411                {
412                        final Demand dMe = (Demand) event.getEventObject();
413                        final double new_hd = computeHdFromPrices (dMe);
414                        dMe.setOfferedTraffic(new_hd);
415                        
416                        final double updateTime = cc_update_isSynchronous.getBoolean()? time + cc_update_averageInterUpdateTime.getDouble() : Math.max(time , time + cc_update_averageInterUpdateTime.getDouble() + cc_update_maxFluctuationInterUpdateTime.getDouble() * (rng.nextDouble() - 0.5));
417                        this.scheduleEvent(new SimEvent (updateTime , SimEvent.DestinationModule.EVENT_PROCESSOR , CC_UPDATE_WAKEUPTOUPDATE,  dMe));
418
419                        this.stat_traceOf_hd.add(time, currentNetPlan.getVectorDemandOfferedTraffic());
420
421                        if (time > this.cc_simulation_maxNumberOfUpdateIntervals.getDouble() * this.cc_update_averageInterUpdateTime.getDouble()) { this.endSimulation (); }
422                        
423                        break;
424                }
425
426                default: throw new RuntimeException ("Unexpected received event");
427                }
428                
429                
430        }
431
432        public String finish (StringBuilder st , double simTime)
433        {
434                System.out.println("stat_accumNumGeneratedPackets_d: " + Arrays.toString(stat_accumNumGeneratedPackets_d));
435                System.out.println("stat_accumNumReceivedPackets_d: " + Arrays.toString(stat_accumNumReceivedPackets_d));
436                System.out.println("transmittedNotReceived: " + Arrays.toString(IntUtils.substract(stat_accumNumGeneratedPackets_d, stat_accumNumReceivedPackets_d)));
437
438                /* If no output file, return */
439                if (simulation_outFileNameRoot.getString().equals("")) return null;
440                
441                TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_objFunc.txt"), new double [] { computeObjectiveFunctionFromNetPlan (optNetPlan.getFirst()) } );
442                TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_xp.txt"), optNetPlan.getFirst().getVectorRouteCarriedTraffic());
443                TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_hd.txt"), optNetPlan.getFirst().getVectorDemandOfferedTraffic());
444                TimeTrace.printToFile(new File (simulation_outFileNameRoot.getString() + "_jom_qnd.txt"), DoubleUtils.mult(optNetPlan.getSecond(), 1/routing_gradient_gammaStep.getDouble()));
445                this.stat_traceOf_queueSizes.printToFile(new File (simulation_outFileNameRoot.getString() + "_qnd.txt"));
446                this.stat_traceOf_objFunction.printToFile(new File (simulation_outFileNameRoot.getString() + "_objFunc.txt"));
447                this.stat_traceOf_ye.printToFile(new File (simulation_outFileNameRoot.getString() + "_ye.txt"));
448                for (Pair<Double,Object> sample : this.stat_traceOf_xp.getList())
449                {
450                        Map<Long,Double> x_p = (Map<Long,Double>) sample.getSecond ();
451                        for (long r: this.currentNetPlan.getRouteIds())
452                                if (!x_p.containsKey(r)) x_p.put(r , 0.0); 
453                }
454                this.stat_traceOf_xp.printToFile(new File (simulation_outFileNameRoot.getString() + "_xp.txt"));
455                this.stat_traceOf_hd.printToFile(new File (simulation_outFileNameRoot.getString() + "_hd.txt"));
456                
457                
458                return null;
459        }
460        
461        private Pair<NetPlan,double[][]> computeOptimumSolution ()
462        {
463                /* Modify the map so that it is the pojection where all elements sum h_d, and are non-negative */
464                final int D = this.currentNetPlan.getNumberOfDemands();
465                final int E = this.currentNetPlan.getNumberOfLinks();
466                final DoubleMatrix1D u_e = this.currentNetPlan.getVectorLinkCapacity();
467                                
468                
469                OptimizationProblem op = new OptimizationProblem();
470                op.addDecisionVariable("x_de", false , new int[] { D , E }, 0 , u_e.getMaxLocation() [0]);
471                op.addDecisionVariable("h_d", false, new int[] {1, D}, cc_control_minHd.getDouble() , cc_control_maxHd.getDouble());
472
473                op.setInputParameter("u_e", u_e , "row");
474                op.setInputParameter("alpha", this.cc_control_fairnessFactor.getDouble());
475
476                /* Set some input parameters */
477                /* Sets the objective function */
478                if (cc_control_fairnessFactor.getDouble() == 1)
479                    op.setObjectiveFunction("maximize", "sum(ln(h_d))");
480                else if (cc_control_fairnessFactor.getDouble() == 0)
481                    op.setObjectiveFunction("maximize", "sum(h_d)");
482                else
483                    op.setObjectiveFunction("maximize", "(1-alpha) * sum(h_d ^ (1-alpha))");
484
485                op.setInputParameter("A_ne", currentNetPlan.getMatrixNodeLinkIncidence());
486                op.setInputParameter("A_nd", currentNetPlan.getMatrixNodeDemandIncidence());
487                op.addConstraint("A_ne * (x_de') >= A_nd * diag(h_d)" , "flowConservationConstraints"); // the flow-conservation constraints (NxD constraints)
488                op.addConstraint("sum(x_de,1) <= u_e"); // the capacity constraints (E constraints)
489                
490                /* Call the solver to solve the problem */
491                op.solve("ipopt");
492
493                /* If an optimal solution was not found, quit */
494                if (!op.solutionIsOptimal ()) throw new RuntimeException ("An optimal solution was not found");
495        
496                /* Retrieve the optimum solutions. Convert the bps into Erlangs */
497                final DoubleMatrix1D h_d_array = op.getPrimalSolution("h_d").view1D ();
498                final DoubleMatrix2D x_de_array = op.getPrimalSolution("x_de").view2D ();
499                final double [][] q_nd_array = (double [][]) op.getMultipliersOfConstraint("flowConservationConstraints").assign(DoubleFunctions.abs).toArray();
500
501                /* Convert the x_de variables into a set of routes for each demand  */
502                NetPlan np = this.currentNetPlan.copy();
503                np.removeAllUnicastRoutingInformation();
504                np.setVectorDemandOfferedTraffic(h_d_array);
505                np.setRoutingFromDemandLinkCarriedTraffic(x_de_array , false , false);
506
507                /* Check solution: all traffic is carried, no link oversubscribed */
508                for (Demand d : np.getDemands())
509                {
510                        final double thisDemand_hd = d.getOfferedTraffic();
511                        if (Math.abs (d.getCarriedTraffic() - d.getOfferedTraffic()) > 1e-3) throw new RuntimeException ("Bad");
512                        if (thisDemand_hd < cc_control_minHd.getDouble() - 1E-3) throw new RuntimeException ("Bad");
513                        if (thisDemand_hd > cc_control_maxHd.getDouble() + 1E-3) throw new RuntimeException ("Bad");
514                }
515                if (np.getVectorLinkUtilizationIncludingProtectionSegments().getMaxLocation() [0] > 1.001) throw new RuntimeException ("Bad");
516
517                return Pair.of(np,q_nd_array);
518        }
519        
520        
521        private class PacketInfo
522        {
523                final Demand d;
524                final double packetCreationTime; 
525                LinkedList<Link> seqLinks;
526                PacketInfo (Demand d , double creationTime) { this.d = d; this.packetCreationTime = creationTime; this.seqLinks = new LinkedList<Link> (); }
527        }
528        private int [][] copyOf (int [][] a) { int [][] res = new int [a.length][]; for (int cont = 0 ; cont < a.length ; cont ++) res [cont] = Arrays.copyOf(a[cont], a[cont].length); return res; }
529        private double [][] copyOf (double [][] a) { double [][] res = new double [a.length][]; for (int cont = 0 ; cont < a.length ; cont ++) res [cont] = Arrays.copyOf(a[cont], a[cont].length); return res; }
530        private double [][] scaledCopyOf (int [][] a , double mult) { double [][] res = new double [a.length][a[0].length]; for (int cont = 0 ; cont < a.length ; cont ++) for (int cont2 = 0; cont2 < a[cont].length ; cont2 ++) res [cont][cont2] = a[cont][cont2] * mult; return res; }
531        
532        
533        private double computeObjectiveFunctionFromNetPlan (NetPlan np)
534        {
535                double objFunc = 0;
536                for (Demand d : np.getDemands())
537                {
538                        final double h_d = d.getOfferedTraffic();
539                        objFunc += (this.cc_control_fairnessFactor.getDouble() == 1)? Math.log(h_d) : Math.pow(h_d, 1-this.cc_control_fairnessFactor.getDouble()) / (1-this.cc_control_fairnessFactor.getDouble());
540                }
541                return objFunc;
542        }
543        
544        private double computeHdFromPrices (Demand d)
545        {
546                /* compute the demand price as weighted sum in the routes of route prices  */
547                final int index_ad = d.getIngressNode().getIndex ();
548                final int index_d = d.getIndex ();
549                final double demandInitialNodeQueueSize = this.routing_numTrafficUnitsOfOnePacket.getDouble() * ((double) this.ctlNumPacketsQueue_nd [index_ad][index_d]) * this.routing_gradient_gammaStep.getDouble();
550                /* compute the new h_d */
551                final double new_hd = Math.max(this.cc_control_minHd.getDouble() , Math.min(this.cc_control_maxHd.getDouble(), Math.pow(demandInitialNodeQueueSize, -1/this.cc_control_fairnessFactor.getDouble())));
552
553                return new_hd;
554        }
555
556        private Map<Long,Double> netPlanRouteCarriedTrafficMap (NetPlan np) { Map<Long,Double> res = new HashMap<Long,Double> (); for (Route r : np.getRoutes ()) res.put (r.getId () , r.getCarriedTraffic()); return res; }
557
558}