Skip to content
Snippets Groups Projects
AvatarTransitionServer.sysml 28.9 KiB
Newer Older
// AVATAR CENTRALIZED TRANSITION SERVER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
package AvatarTransitionServer {
    private import Clocks::TimeOf;
    private import Time::TimeInstantValue;
    private import AvatarGeneral::*;
    private import AvatarCommunication::*;
    public import Occurrences::Occurrence;

	doc /* The server is used to select a transition to execute among the possible outgoing transitions 
	 * of a state of an Avatar block statemachine. For this, states will submit pools of requests to 
	 * the server, where each request is associated to one of its outgoing transitions. Then, as soon
	 * as some requests in the pool are feasible, the server chose one of them and send a response to
	 * the state identifying the selected request and providing the associated '@payload' (transmitted 
	 * message for receiving requests, and acknowledgment in other cases)
	 */
	 
// AVATAR TRANSITION REQUESTS ============================================

	abstract part def '#Request' {
		doc /* The most general request type. Outgoing transition from a state are statically ordered.
		 * Thus associated requests have an index w.r.t this order, which identifies the request they
		 * are associated to.
		 */
	
    	readonly attribute '@index' : Positive default 1;
    	
    	comment Request_Partition about '#OKrequest', '#NOKrequest', '#Request'
    	/* Requests are either certainly unfeasible (a #NOKrequest has false guard)
     	 * or potentially feasible (an #OKrequest has true guard)
     	 * #OKrequest, #NOKrequest is a partition of #Request
     	 */
    }
    
	part def '#NOKrequest' :> '#Request' {
		doc /* Certainly unfeasible requests, i.e. requests with false guard.
		 * they cannot be selected by the centralized server.
		 */
	}

	abstract part def '#OKrequest' :> '#Request' {
		doc /* Potentially feasible requests, i.e. requests with true guard.
		     * they may be selected by the centralized server.
   			 * They may be '@delay'ed by an "after", thus they have a trigger date w.r.t
   			 * the system discrete clock.
   			 */
	    
	    item localClock : Clock redefines localClock = '#systemClock' ;
	    readonly attribute '@delay' : Positive default 0;
	    derived attribute trigger_at = TimeOf(clock=localClock, o=self) + '@delay';

		comment OKrequest_Partition about '#OKrequest', '#TrivialRequest', '#CommunicationRequest'
    		/* OK requests may be without dependency and thus Trivialially feasible (#TrivialRequest). 
    		 * Non Trivialial requests are the ones associated to communication (#CommunicationRequest).
    		 * They may require synchronization, or channel or data to be available.
    		 * #TrivialRequest, #CommunicationRequest, is a partition of #OKrequest 
    		 */
	}
    
	part def '#TrivialRequest' :> '#OKrequest'{
		doc /* Trivialial requests don't communicate, thus don't block. Associated transitions
	     * only execute actions such as assignment, external method calls,...
	     */
	}
	
	item '#immediate_request' : '#TrivialRequest' = '#TrivialRequest'();
	
	abstract part def '#CommunicationRequest' :> '#OKrequest' {
		doc /* Communication requests, which have an associated communication channel.
		 * They may block until some ressource is available, depending of the kind of channel
		 */

    	readonly item '@channel' : '#Channel';
    	readonly attribute is_send : Boolean {
    		doc /* request direction, which is redoundant with #CommunicationRequest 
    		 * specializations but useful to factor some descriptions in the model
    		 */
		}

		comment about '#CommunicationRequest', '#SendRequest', '#ReceiveRequest'
		    /* communication requests have a direction: they are "send" or "receive". 
		     * #SendRequest, #ReceiveRequest, is a partition of #CommunicationRequest
		     */
    }

	part def '#SendRequest' :> '#CommunicationRequest' {
		doc /* Requests for sending. They have have a '@payload' (message) */
	    readonly item '@payload' : '#Message';
	    readonly attribute is_send redefines is_send = true;
	}
	
	part def '#ReceiveRequest' :> '#CommunicationRequest' {
		doc /* Requests for receiving */
		
	    readonly attribute is_send redefines is_send = false;
	}
	
	part def '#RequestPool' {
		doc /* Pools of requests. They contain one request per outgoing transition of the
		 * state they are associated to. They have an associated requestor identifying 
		 * who to send the response to (generally the block whose statemachine contains 
		 * the state)
		 */
		 
	    occurrence requestor : Occurrence;
	    item pool : '#Request'[0..*] ordered default := '#immediate_request';
	    
	    assert constraint {
	    	doc /* Request indexes correspond to their range in the pool. This is a redoundancy which
	    	 * makes the model more simple. 
	    	 * Moreover, all transitions from the concerned state in Avatar model have a corresponding 
	    	 * request , which cannot be expressed here as it is a constraint on the software which 
	    	 * generates the Avatar SysMLV2 model
	    	 */
	    	calc test {
	    	 	first start;
	    	 	then assign result := true;
	    	 	then for i in 1..size(pool) { 
	    	 		first start;
	    			then assign result := result & pool#(i).'@index' == i; 
	    		}
	    	}
	    	test()
	    }
		 
	}

// AVATAR RESPONSES TO REQUESTS ============================================
	item def '#Response' {
		doc /* Response to submitted request pool.
		 * Once a request in a pool is satisfied, the server sends a response to
	 	 * the requestor associated to the pool. This response identifies the
	 	 * (index of the transition corresponding to the) satisfied request in 
	 	 * the pool and carry some '@payload'. A '@payload' is a message (which can be
	 	 * a simple acknowledgement).
	 	 * Remark: As a resquestor should not have more than one pending request
	 	 * pool in the server, this information is sufficient to root the response
	 	 * to a unique well identified waiting accept action.
	 	 */
		
	    attribute '@index' : Positive default 1;
	    item '@payload' : '#Message' default '#ack_message';
	}
	item '#default_response' = '#Response'();

// THE SERVER ITSELF ========================================================

	part '#transitionRequestServer' {
		doc /* The transition request server. 
		 * General Principle: all submitted requests are initially considered as 
		 * '@delay'ed (even if their '@delay' is 0, for homogeneity). '@delay'ed requests
		 * wake up themself at their "trigger_at" date and launch the server's
		 * request handler on themself. The handler try to execute the request 
		 * which may lead either to successfull or blocking issue.
		 * A request wake up once and only once. After this, if it is blocked 
		 * (pending communication), it remains in the pending requests but is 
		 * removed from the '@delay'ed requests. Such requests may then be unblocked 
		 * by the wake-up of a dual request (a send for a receive or a receive for 
		 * a send), or discarded if some other request in their pool is selected 
		 * to be executed. Pools may also wait forever...
		 * 
		 * The server maintains two sets of pending pools:
		 * - general pending pools are all pending pools, containing all 
		 *   pending requests
		 * - '@delay'ed pending pools (each one associated to a general pending pools)
		 *   are pools containing '@delay'ed requests, and only '@delay'ed requests.
		 * 
		 * When a pool is submitted by a state, it is added to both sets. 
		 * When a '@delay'ed request wake up, it is removed from its '@delay'ed pool (which 
		 * is removed when becomming empty -- facultatif) and if handling 
		 * this request leads to a success, all involved pools are removed from
		 * both sets and relevant responses are send to the involved communicating 
		 * ends (which pools and which responses depends on channel types).
		 */
		 
		item localClock : Clock redefines localClock = '#systemClock' ;
		private individual item def Kill { doc /* type of the local kill signal */ }
		private individual item kill : Kill {
		 	doc /* The local kill signal to be sent to '@delay'ed requests when 
		 	 * their pool is deleted because some other request in the pool 
		 	 * is executed
		 	 */
		}
		private item mutex : '#Lock';
    	private item request_pools : '#RequestPool'[0..*] ordered {
    	 	doc /* general pending pools containing requests waiting for wake up 
    	 	 * or available ressources
    	 	 */
    	}
   		protected item delayed_pools : '#delayedPool'[0..*] ordered {
    		doc /* '@delay'ed pending pools containing requests waiting for wake up */
		part def '#delayedPool' {
			doc /* Type of pools containing '@delay'ed requests. Such pools are
			 *  associated to the general request pool that contain their requests.
			 */
    		item request_pool : '#RequestPool';
    		item delayed_requests : '#delayedRequest'[0..*] ordered;
		part def '#delayedRequest' {
			doc /* (operational) structure for '@delay'ed requests */

    		attribute trigger_at : Positive, TimeInstantValue { doc /* The date at which to wake up */ }
    		item delayed_pool : '#delayedPool' { doc /* the '@delay'ed pool the request belongs to */}
    		attribute '@index' : Positive;
    		
    		private readonly item myself = self; // technical... to be referenced in sub-components
		    protected item delayed_pools : '#delayedPool'[0..*] ordered = '#transitionRequestServer'.delayed_pools;
		    // apparently required !?!
    
		    state statemachine { 
		    	doc /* waits for wake up or kill */
		        entry; then state wait;
		        transition first wait accept Kill then done;
		        transition first wait accept at trigger_at
		          do action {
		            perform action remove_request subsets 
		            	'#multiset_remove' {
		                	in multiset redefines multiset = delayed_pool.delayed_requests; 
		                    in pred redefines pred = '#identity'(myself); 
		                 }
                    then if delayed_pool.delayed_requests == null {
 		            	 	perform action remove_delayed_pool subsets 
		            		'#multiset_remove' {
		                		in multiset redefines multiset = delayed_pools; 
		                    	in pred redefines pred = '#identity'(delayed_pool); 
		                 	} then done;
		                 }
		            then action wake_up subsets '#transitionRequestServer'.request_wake_up { 
		                        in request_pool redefines request_pool = delayed_pool.request_pool; 
		                        in transition_id redefines transition_id = transition_id; 
		                 }
           		  }
				  then done;
		    }
		    first statemachine then done;
		}
		
		// FOR SUBMITTING A REQUEST POOL ==========================================

		action submit { // add parameter pool to request_pools and delayed_pools
			in request_pool : '#RequestPool';
			part delayed_pool : '#delayedPool' { 
     			item request_pool : '#RequestPool' redefines request_pool = request_pool;
    			item delayed_requests : '#delayedRequest'[0..*] redefines delayed_requests := null;
    		}
    		first start; then perform mutex.lock;
    		for i in 1..size(request_pool.pool) {
    			if request_pool.pool#(i) hastype '#OKrequest' { // build '@delay'ed request for ith request
    				item request := request_pool.pool#(i) as '#OKrequest';
    				part delayed_request : '#delayedRequest' {
    					in trigger_at redefines trigger_at = request.trigger_at;
    					in delayed_pool redefines delayed_pool = delayed_pool;
    					in '@index' redefines '@index' = request.'@index';
    				first start; then // add '@delay'ed request
    				assign delayed_pool.delayed_requests := (delayed_pool.delayed_requests,delayed_request);
    				then delayed_request.start;
    			}
    		} 
    		then assign request_pools := (request_pools, request_pool); // add general request pool
    		then if delayed_pools.delayed_requests != null {
    			assign delayed_pools := (delayed_pools, delayed_pool);
    		} // add '@delay'ed request pool if not empty
    		then perform mutex.unlock;
    		then done;
		}
		
		// REQUEST WAKE UP: tries request synchronization if request has not already been handled
		
		comment about request, request_pool /* ALGORITHMIC VARIABLES: inputs for trying a synchronization */
    	protected part request : '#Request'{ doc /* ALGORITHMIC VARIABLE : the request to be synchronized */ }
    	protected part request_pool : '#RequestPool'{ doc /* tALGORITHMIC VARIABLE :he pool of the request to be synchronized */ }

		action request_wake_up { 
			doc /* verifies that request has not already been synchronized and if not, try to synchronize it */
		    in request_pool : '#RequestPool'; 
		    in transition_id : Positive;
		    
		    first start; then perform mutex.lock;
		    then perform action todo subsets '#multiset_exists'{ 
		    	in multiset redefines multiset = request_pools; 
		    	in pred redefines pred = '#identity'(request_pool); }
		    then if todo.result { 
		    	first start;
		    	then assign request := request_pool#(transition_id);
		    	then assign request_pool := request_pool;
		    	then perform try_request_synchronization;
		    }
		    then perform mutex.unlock;
		    then done;
		}

        action try_request_synchronization {
            first start;
            then perform search_request_synchronization;
            then perform remove_request_pools;
            then perform send_mails;
            then done;
		}
            
		// SUBROUTINE : removing synchronized pending pools =========================
		
		item pools_to_remove : '#RequestPool'[0..2] { 
			doc /* ALGORITHMIC VARIABLE: synchronized pools, i.e associated to synchronized
			 * requests. Updated when finding a synchronization. Removed from pending  pools
			 * when executing this synchronization.
			 */
		}

	    protected action remove_request_pools { 
	    	doc /* Removing pools in pools_to_remove, i.e removing them from both sets of
	    	 * pending pools (general and '@delay'ed). This also kills the process in '@delay'ed 
	    	 * requests which are waiting for either a trigger signal or a kill signal.
	    	 */
	    	 
	        // subroutine
            action kill_delayed_pool_requests : '#Unary_action' { 
            	doc /* kill all waiting requests of a parameter '@delay'ed pool */
                in parameter : '#delayedPool';
                first start;
                then for r in parameter.delayed_requests { first start; then send kill to r; }
            }

			first start;
	        then for p : '#RequestPool' in pools_to_remove {
	        	
	        	// subroutine
		        calc has_request_pool : '#Unary_predicate' { 
		        	doc /* test if the request pool of a parameter '@delay'ed pool is p */
		        	in parameter : '#delayedPool'; parameter.request_pool == p
		        }
		        
	            first start;
	            then action remove_request_pool subsets '#multiset_remove' { // remove pool from general pending pools
	                in multiset redefines multiset = request_pools; 
	                in pred redefines pred = '#identity'(p); 
	            }
				then action remove_delayed_pool subsets  '#multiset_iter' { 
	                in multiset redefines multiset = delayed_pools; 
	                in pred redefines pred = has_request_pool; 
	                in if_act redefines if_act = kill_delayed_pool_requests; 
	            }
	            succession flow remove_delayed_pool.discarded to update_delayed_pools.new_delayed_pools;
	            action update_delayed_pools {
	                in new_delayed_pools  : '#RequestPool'[0..*];
	                first start; then assign delayed_pools := new_delayed_pools; then done;
	            }
	        }
	        then done;
	    }
		
		// SUBROUTINE: sending responses to requestors =============================

	    protected part def '#Mail' { 
	    	doc /* Response computed while finding synchronizations have an associated recipient,
	    	 * This characterizes mails.
	    	 */
	        item recipient: Occurrence;
	        item response: '#Response';
	    }
	    
        item mails_to_send : '#Mail'[0..2] {
        	doc /* ALGORITHMIC VARIABLE: mails associated to synchronized requests. Updated 
        	 * when finding a synchronization. Responses are sent to recipients when executing 
        	 * this synchronization.
			 */
        }

    	protected action send_mails {
    		doc /* send responses to requestors of synchronized requests */
    		first start;
        	then for i : Positive in 1..size(mails_to_send) {
            	first start;
            	then send mails_to_send#(i).response to mails_to_send#(i).recipient;
            	then done;
        	}
        }

	    // SUBROUTINE searching a dual request for current request in pending requests ===============
	
	    private part dual_request : '#Request'[0..1]{
	    	doc /* selected dual pending request of the current request, 
	         * among the found ones
	         */
	    }
	    private part dual_request_pool : '#RequestPool'[0..1] {
	    	doc /* ALGORITHMIC VARIABLE: the pool containing the selected dual pending 
	    	 * request of the current request.
	    	 */
	    }
	
	    action search_pending_dual_request {
		    doc /* search for a dual request of current request in request_pools.
		     * updates dual_request and dual_request_pool accordingly. If no dual request 
		     * is found, they are set to null. If severall are found, one is choosed
		     * randomly.
		     * Dual requests are communication requests on a same channel with opposite 
		     * directions
		     */
			
	        comment about dual_requests, dual_request_pools
	            /* order in these two items are parallel: i-th request belongs to i-th pool */
	
	        part dual_requests : '#Request'[0..*] := null{
	        	doc /* found requests that are dual of current request */
			}
	        
	        part dual_request_pools : '#RequestPool'[0..*] := null {
	        	doc /* the pools containing dual requests in dual_requests, in the same order */
	        }
	               
	        first start;
	        then assign dual_requests := null;
	        then assign dual_request_pools := null;
	        then decide;
	        	if request hastype '#CommunicationRequest' then search; 
	        	else done;
	        	
	        item communication_request[0..1] : '#CommunicationRequest' := request as '#CommunicationRequest';
	        
	        action search assign dual_request := null;
	        then assign dual_request_pool := null;
	        then for p : '#RequestPool' in request_pools {
	            item found : '#Request'[0..*] := null; // dual requests in pool p
	            first start;
	            then for r : '#Request' in p.pool {
	            	if r hastype '#CommunicationRequest' {
	            		item req = r as '#CommunicationRequest';
	            		first start;
	                	then if communication_request.'@channel' == req.'@channel' and 
	                        	communication_request.is_send != req.is_send and
	                        	communication_request.trigger_at >= localClock.currentTime
	                     { first start; then assign found := (found,r); then done; }
	                }
	            }
	            then if found != null {
	            	first start;
	                then assign dual_request_pools := (dual_request_pools, p);
	                then assign dual_requests := (dual_requests,'#multiset_random'(found));
	                then done;
	            }
	            then done;
	        }
	        then if dual_requests != null { // randomly select one of the possible dual requests
	        		attribute x : Positive := '#bound_random'(low=1, high=size(dual_requests));
	            	first start;
	            	then assign dual_request_pool:= dual_request_pools#(x);
	                then assign dual_request:= dual_requests#(x);
	             }
	             else {
	                first start;
	                then assign dual_request_pool:= null;
	                then assign dual_request:= null;
	             }
	        then done;
	    }
	
	    // SUBROUTINE add an acknowledgement for a couple (pool,request)
	    
	    action acknowledge {
	    	doc /* add acknowledgement in mails_to_send and pools_to_remove */
	        in request_pool : '#RequestPool';
	        in request : '#Request';
	        first start;
	        then assign mails_to_send := ( mails_to_send,
	        	'#Mail'( recipient = request_pool.requestor,
	                     response = '#Response'('@index' = request.'@index',
	                                            '@payload' = '#ack_message') ) );
	        then assign pools_to_remove := (pools_to_remove, request_pool);
	    }
	
	    // SEARCH A SYNCHRONIZATION FOR request ==============================
	    
	    action search_request_synchronization { 
	    	doc /* search a synchronization for current request and updates mails_to_send
	    	 * and pools_to_remove accordingly. If no synchronization is found, both
	    	 * variables are set to null.
	    	 */
			first start;
			then assign mails_to_send := null;
			then assign pools_to_remove := null;
	        then decide select_wrt_request_type;
	            if request hastype '#NOKrequest' then done;
	            if request hastype '#TrivialRequest' then trivial_synchro;
	            else communication_synchro;
			
	        // Synchronization of Trivialial actions --------------------------------
	        action trivial_synchro subsets acknowledge {
	        	doc /* Trivialial actions are always synchronized (with no other pool) and simply return 
	        	 * an acknowledgment
	        	 */
	        	in request_pool redefines request_pool;
	        	in request redefines request;
	        }
	        then done; // end of trivial_synchro : search_request_synchronization returns
	        
	        // Synchronization of communication requests, depends on channel type ----
	        item communication_request[0..1] : '#CommunicationRequest' := request as '#CommunicationRequest';
	        decide communication_synchro;
	            if communication_request.'@channel' hastype '#Fifo' then fifo_synchro;
	            else sync_synchro;
	            
	        // Synchronization of communication requests on fifo channel ------------
	        action fifo_synchro {
	        	item fifo : '#Fifo' := communication_request.'@channel' as '#Fifo';
	        	first start;
	            then decide select_wrt_communication_direction;
	                if request hastype '#SendRequest' then send_synchro;
	                else receive_synchro;
	            
	            action send_synchro {
	            	item send_request : '#SendRequest' := communication_request as '#SendRequest';
	            	first start;
	                then decide test_if_lossy;
	                    if fifo.'@relation'.'@lossy' == true then lossy_synchro;
	                    else noloss_synchro;
	                
	                decide lossy_synchro; // loss or not, random choice
	                    then loss_synchro;
	                    then noloss_synchro;
	                    
	                action loss_synchro subsets acknowledge {
	                	in request_pool redefines request_pool = request_pool;
	                	in request redefines request = request;
	                }
	                then done;
	                
	                action noloss_synchro {
	                	first start;
	                    then decide test_if_full;
	                        if fifo.can_put() then put_synchro;
	                        else full_synchro;
	                    
	                    decide full_synchro; // test if blocking
	                        if fifo hastype '#NBfifo' then discard_synchro;
	                        else done;
	                               
		                action discard_synchro subsets acknowledge {
		                	in request_pool redefines request_pool = request_pool;
		                	in request redefines request = request;
		                }
		                then done;
		                
	                    action put_synchro {
	                        first start;
	                    	then action put subsets fifo.put {
	                    		in 'message' redefines 'message' = send_request.'@payload';
	                    	}
	                        then action ack subsets acknowledge {
	                			in request_pool redefines request_pool = request_pool;
	                			in request redefines request = request;
	                		}
	                        then perform search_pending_dual_request;
	                        then
	                        if dual_request != null {
	                            first start;
								then assign pools_to_remove := (pools_to_remove,dual_request_pool);
	                            then assign mails_to_send := (mails_to_send,
	                                 '#Mail'( recipient = dual_request_pool.requestor,
	                                          response = '#Response'( '@index' = dual_request.'@index',
	                                                                  '@payload' = fifo.get() )));
	                            then done;
							}
	                    } then done;
	                } then done; 
	            } then done; // end of send_synchro : fifo_synchro returns
	            
	            action receive_synchro {
	                first start;
	                then if fifo.can_get() {
	                    first start;
	                    then assign mails_to_send := 
	                         '#Mail'( recipient = request_pool.requestor,
	                                 response = '#Response'('@index' = communication_request.'@index',
	                                                        '@payload' = fifo.get() ));
	                    then assign pools_to_remove := request_pool;
	                    then perform search_pending_dual_request;
	                    then
	                    if dual_request != null {
	                        first start;
	                        then assign pools_to_remove := (pools_to_remove,dual_request_pool);
	                        then action ack subsets acknowledge {
	                            in request_pool redefines request_pool = dual_request_pool;
	                            in request redefines request = dual_request;
	                        } then done;
	                    } then done;
	                } then done;
	            } then done; // end of receive_synchro : fifo_synchro returns
	        } then done; // end of fifo_synchro : search_request_synchronization returns
	
	        // Synchronization of communication requests on synchroneous channel ------------
	        action sync_synchro {
	            first start;
	            then decide select_wrt_communication_direction;
	                if communication_request hastype '#SendRequest' then send_synchro;
	                else receive_synchro;
	            
	            action send_synchro {
	                first start;
	                then perform search_pending_dual_request;
	                then
	                if dual_request != null {
                    	item send_request : '#SendRequest' = request as '#SendRequest';
	                    first start;
	                    then action ack subsets acknowledge {
	                        in request_pool redefines request_pool = request_pool;
	                        in request redefines request = request;
	                    }
                        then assign mails_to_send := ( mails_to_send,
                                    '#Mail'( recipient = dual_request_pool.requestor,
                                            response = '#Response'(
                                                '@index' = dual_request.'@index',
                                                '@payload' = send_request.'@payload' )) );
                        then assign pools_to_remove := (pools_to_remove, dual_request_pool);
                        then done;
                    } then done;
	            } then done; // end of send_synchro : sync_synchro returns
	            
	            action receive_synchro {
	                first start;
	                then perform search_pending_dual_request;
                    then
                    if dual_request != null {
                    	item send_request : '#SendRequest' = dual_request as '#SendRequest';
                        first start;
                        then action ack subsets acknowledge {
                            in request_pool redefines request_pool = dual_request_pool;
                            in request redefines request = dual_request;
                        }
                        then assign mails_to_send := ( mails_to_send,
                                    '#Mail'( recipient = request_pool.requestor,
                                            response = '#Response'(
                                                '@index' = request.'@index',
                                                '@payload' = send_request.'@payload' )) );
                        then assign pools_to_remove := (pools_to_remove, request_pool);
                        then done;
                    } then done;
	            } then done; // end of receive_synchro : sync_synchro returns
	        } then done; // end of sync_synchro : search_request_synchronization returns
	    } // end of search_request_synchronization
	}
}