// AVATAR CENTRALIZED TRANSITION SERVER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% package AvatarTransitionServer { private import Clocks::TimeOf; private import Time::TimeInstantValue; private import AvatarGeneral::*; private import AvatarCommunication::*; public import Occurrences::Occurrence; doc /* The server is used to select a transition to execute among the possible outgoing transitions * of a state of an Avatar block statemachine. For this, states will submit pools of requests to * the server, where each request is associated to one of its outgoing transitions. Then, as soon * as some requests in the pool are feasible, the server chose one of them and send a response to * the state identifying the selected request and providing the associated '@payload' (transmitted * message for receiving requests, and acknowledgment in other cases) */ // AVATAR TRANSITION REQUESTS ============================================ abstract part def '#Request' { doc /* The most general request type. Outgoing transition from a state are statically ordered. * Thus associated requests have an index w.r.t this order, which identifies the request they * are associated to. */ readonly attribute '@index' : Positive default 1; comment Request_Partition about '#OKrequest', '#NOKrequest', '#Request' /* Requests are either certainly unfeasible (a #NOKrequest has false guard) * or potentially feasible (an #OKrequest has true guard) * #OKrequest, #NOKrequest is a partition of #Request */ } part def '#NOKrequest' :> '#Request' { doc /* Certainly unfeasible requests, i.e. requests with false guard. * they cannot be selected by the centralized server. */ } abstract part def '#OKrequest' :> '#Request' { doc /* Potentially feasible requests, i.e. requests with true guard. * they may be selected by the centralized server. * They may be '@delay'ed by an "after", thus they have a trigger date w.r.t * the system discrete clock. */ item localClock : Clock redefines localClock = '#systemClock' ; readonly attribute '@delay' : Positive default 0; derived attribute trigger_at = TimeOf(clock=localClock, o=self) + '@delay'; comment OKrequest_Partition about '#OKrequest', '#TrivialRequest', '#CommunicationRequest' /* OK requests may be without dependency and thus Trivialially feasible (#TrivialRequest). * Non Trivialial requests are the ones associated to communication (#CommunicationRequest). * They may require synchronization, or channel or data to be available. * #TrivialRequest, #CommunicationRequest, is a partition of #OKrequest */ } part def '#TrivialRequest' :> '#OKrequest'{ doc /* Trivialial requests don't communicate, thus don't block. Associated transitions * only execute actions such as assignment, external method calls,... */ } item '#immediate_request' : '#TrivialRequest' = '#TrivialRequest'(); abstract part def '#CommunicationRequest' :> '#OKrequest' { doc /* Communication requests, which have an associated communication channel. * They may block until some ressource is available, depending of the kind of channel */ readonly item '@channel' : '#Channel'; readonly attribute is_send : Boolean { doc /* request direction, which is redoundant with #CommunicationRequest * specializations but useful to factor some descriptions in the model */ } comment about '#CommunicationRequest', '#SendRequest', '#ReceiveRequest' /* communication requests have a direction: they are "send" or "receive". * #SendRequest, #ReceiveRequest, is a partition of #CommunicationRequest */ } part def '#SendRequest' :> '#CommunicationRequest' { doc /* Requests for sending. They have have a '@payload' (message) */ readonly item '@payload' : '#Message'; readonly attribute is_send redefines is_send = true; } part def '#ReceiveRequest' :> '#CommunicationRequest' { doc /* Requests for receiving */ readonly attribute is_send redefines is_send = false; } part def '#RequestPool' { doc /* Pools of requests. They contain one request per outgoing transition of the * state they are associated to. They have an associated requestor identifying * who to send the response to (generally the block whose statemachine contains * the state) */ occurrence requestor : Occurrence; item pool : '#Request'[0..*] ordered default := '#immediate_request'; assert constraint { doc /* Request indexes correspond to their range in the pool. This is a redoundancy which * makes the model more simple. * Moreover, all transitions from the concerned state in Avatar model have a corresponding * request , which cannot be expressed here as it is a constraint on the software which * generates the Avatar SysMLV2 model */ calc test { first start; then assign result := true; then for i in 1..size(pool) { first start; then assign result := result & pool#(i).'@index' == i; } } test() } } // AVATAR RESPONSES TO REQUESTS ============================================ item def '#Response' { doc /* Response to submitted request pool. * Once a request in a pool is satisfied, the server sends a response to * the requestor associated to the pool. This response identifies the * (index of the transition corresponding to the) satisfied request in * the pool and carry some '@payload'. A '@payload' is a message (which can be * a simple acknowledgement). * Remark: As a resquestor should not have more than one pending request * pool in the server, this information is sufficient to root the response * to a unique well identified waiting accept action. */ attribute '@index' : Positive default 1; item '@payload' : '#Message' default '#ack_message'; } item '#default_response' = '#Response'(); // THE SERVER ITSELF ======================================================== part '#transitionRequestServer' { doc /* The transition request server. * General Principle: all submitted requests are initially considered as * '@delay'ed (even if their '@delay' is 0, for homogeneity). '@delay'ed requests * wake up themself at their "trigger_at" date and launch the server's * request handler on themself. The handler try to execute the request * which may lead either to successfull or blocking issue. * A request wake up once and only once. After this, if it is blocked * (pending communication), it remains in the pending requests but is * removed from the '@delay'ed requests. Such requests may then be unblocked * by the wake-up of a dual request (a send for a receive or a receive for * a send), or discarded if some other request in their pool is selected * to be executed. Pools may also wait forever... * * The server maintains two sets of pending pools: * - general pending pools are all pending pools, containing all * pending requests * - '@delay'ed pending pools (each one associated to a general pending pools) * are pools containing '@delay'ed requests, and only '@delay'ed requests. * * When a pool is submitted by a state, it is added to both sets. * When a '@delay'ed request wake up, it is removed from its '@delay'ed pool (which * is removed when becomming empty -- facultatif) and if handling * this request leads to a success, all involved pools are removed from * both sets and relevant responses are send to the involved communicating * ends (which pools and which responses depends on channel types). */ item localClock : Clock redefines localClock = '#systemClock' ; private individual item def Kill { doc /* type of the local kill signal */ } private individual item kill : Kill { doc /* The local kill signal to be sent to '@delay'ed requests when * their pool is deleted because some other request in the pool * is executed */ } private item mutex : '#Lock'; private item request_pools : '#RequestPool'[0..*] ordered { doc /* general pending pools containing requests waiting for wake up * or available ressources */ } protected item delayed_pools : '#delayedPool'[0..*] ordered { doc /* '@delay'ed pending pools containing requests waiting for wake up */ } part def '#delayedPool' { doc /* Type of pools containing '@delay'ed requests. Such pools are * associated to the general request pool that contain their requests. */ item request_pool : '#RequestPool'; item delayed_requests : '#delayedRequest'[0..*] ordered; } part def '#delayedRequest' { doc /* (operational) structure for '@delay'ed requests */ attribute trigger_at : Positive, TimeInstantValue { doc /* The date at which to wake up */ } item delayed_pool : '#delayedPool' { doc /* the '@delay'ed pool the request belongs to */} attribute '@index' : Positive; private readonly item myself = self; // technical... to be referenced in sub-components protected item delayed_pools : '#delayedPool'[0..*] ordered = '#transitionRequestServer'.delayed_pools; // apparently required !?! state statemachine { doc /* waits for wake up or kill */ entry; then state wait; transition first wait accept Kill then done; transition first wait accept at trigger_at do action { perform action remove_request subsets '#multiset_remove' { in multiset redefines multiset = delayed_pool.delayed_requests; in pred redefines pred = '#identity'(myself); } then if delayed_pool.delayed_requests == null { perform action remove_delayed_pool subsets '#multiset_remove' { in multiset redefines multiset = delayed_pools; in pred redefines pred = '#identity'(delayed_pool); } then done; } then action wake_up subsets '#transitionRequestServer'.request_wake_up { in request_pool redefines request_pool = delayed_pool.request_pool; in transition_id redefines transition_id = transition_id; } } then done; } first statemachine then done; } // FOR SUBMITTING A REQUEST POOL ========================================== action submit { // add parameter pool to request_pools and delayed_pools in request_pool : '#RequestPool'; part delayed_pool : '#delayedPool' { item request_pool : '#RequestPool' redefines request_pool = request_pool; item delayed_requests : '#delayedRequest'[0..*] redefines delayed_requests := null; } first start; then perform mutex.lock; for i in 1..size(request_pool.pool) { if request_pool.pool#(i) hastype '#OKrequest' { // build '@delay'ed request for ith request item request := request_pool.pool#(i) as '#OKrequest'; part delayed_request : '#delayedRequest' { in trigger_at redefines trigger_at = request.trigger_at; in delayed_pool redefines delayed_pool = delayed_pool; in '@index' redefines '@index' = request.'@index'; } first start; then // add '@delay'ed request assign delayed_pool.delayed_requests := (delayed_pool.delayed_requests,delayed_request); then delayed_request.start; } } then assign request_pools := (request_pools, request_pool); // add general request pool then if delayed_pools.delayed_requests != null { assign delayed_pools := (delayed_pools, delayed_pool); } // add '@delay'ed request pool if not empty then perform mutex.unlock; then done; } // REQUEST WAKE UP: tries request synchronization if request has not already been handled comment about request, request_pool /* ALGORITHMIC VARIABLES: inputs for trying a synchronization */ protected part request : '#Request'{ doc /* ALGORITHMIC VARIABLE : the request to be synchronized */ } protected part request_pool : '#RequestPool'{ doc /* tALGORITHMIC VARIABLE :he pool of the request to be synchronized */ } action request_wake_up { doc /* verifies that request has not already been synchronized and if not, try to synchronize it */ in request_pool : '#RequestPool'; in transition_id : Positive; first start; then perform mutex.lock; then perform action todo subsets '#multiset_exists'{ in multiset redefines multiset = request_pools; in pred redefines pred = '#identity'(request_pool); } then if todo.result { first start; then assign request := request_pool#(transition_id); then assign request_pool := request_pool; then perform try_request_synchronization; } then perform mutex.unlock; then done; } action try_request_synchronization { first start; then perform search_request_synchronization; then perform remove_request_pools; then perform send_mails; then done; } // SUBROUTINE : removing synchronized pending pools ========================= item pools_to_remove : '#RequestPool'[0..2] { doc /* ALGORITHMIC VARIABLE: synchronized pools, i.e associated to synchronized * requests. Updated when finding a synchronization. Removed from pending pools * when executing this synchronization. */ } protected action remove_request_pools { doc /* Removing pools in pools_to_remove, i.e removing them from both sets of * pending pools (general and '@delay'ed). This also kills the process in '@delay'ed * requests which are waiting for either a trigger signal or a kill signal. */ // subroutine action kill_delayed_pool_requests : '#Unary_action' { doc /* kill all waiting requests of a parameter '@delay'ed pool */ in parameter : '#delayedPool'; first start; then for r in parameter.delayed_requests { first start; then send kill to r; } } first start; then for p : '#RequestPool' in pools_to_remove { // subroutine calc has_request_pool : '#Unary_predicate' { doc /* test if the request pool of a parameter '@delay'ed pool is p */ in parameter : '#delayedPool'; parameter.request_pool == p } first start; then action remove_request_pool subsets '#multiset_remove' { // remove pool from general pending pools in multiset redefines multiset = request_pools; in pred redefines pred = '#identity'(p); } then action remove_delayed_pool subsets '#multiset_iter' { in multiset redefines multiset = delayed_pools; in pred redefines pred = has_request_pool; in if_act redefines if_act = kill_delayed_pool_requests; } succession flow remove_delayed_pool.discarded to update_delayed_pools.new_delayed_pools; action update_delayed_pools { in new_delayed_pools : '#RequestPool'[0..*]; first start; then assign delayed_pools := new_delayed_pools; then done; } } then done; } // SUBROUTINE: sending responses to requestors ============================= protected part def '#Mail' { doc /* Response computed while finding synchronizations have an associated recipient, * This characterizes mails. */ item recipient: Occurrence; item response: '#Response'; } item mails_to_send : '#Mail'[0..2] { doc /* ALGORITHMIC VARIABLE: mails associated to synchronized requests. Updated * when finding a synchronization. Responses are sent to recipients when executing * this synchronization. */ } protected action send_mails { doc /* send responses to requestors of synchronized requests */ first start; then for i : Positive in 1..size(mails_to_send) { first start; then send mails_to_send#(i).response to mails_to_send#(i).recipient; then done; } } // SUBROUTINE searching a dual request for current request in pending requests =============== private part dual_request : '#Request'[0..1]{ doc /* selected dual pending request of the current request, * among the found ones */ } private part dual_request_pool : '#RequestPool'[0..1] { doc /* ALGORITHMIC VARIABLE: the pool containing the selected dual pending * request of the current request. */ } action search_pending_dual_request { doc /* search for a dual request of current request in request_pools. * updates dual_request and dual_request_pool accordingly. If no dual request * is found, they are set to null. If severall are found, one is choosed * randomly. * Dual requests are communication requests on a same channel with opposite * directions */ comment about dual_requests, dual_request_pools /* order in these two items are parallel: i-th request belongs to i-th pool */ part dual_requests : '#Request'[0..*] := null{ doc /* found requests that are dual of current request */ } part dual_request_pools : '#RequestPool'[0..*] := null { doc /* the pools containing dual requests in dual_requests, in the same order */ } first start; then assign dual_requests := null; then assign dual_request_pools := null; then decide; if request hastype '#CommunicationRequest' then search; else done; item communication_request[0..1] : '#CommunicationRequest' := request as '#CommunicationRequest'; action search assign dual_request := null; then assign dual_request_pool := null; then for p : '#RequestPool' in request_pools { item found : '#Request'[0..*] := null; // dual requests in pool p first start; then for r : '#Request' in p.pool { if r hastype '#CommunicationRequest' { item req = r as '#CommunicationRequest'; first start; then if communication_request.'@channel' == req.'@channel' and communication_request.is_send != req.is_send and communication_request.trigger_at >= localClock.currentTime { first start; then assign found := (found,r); then done; } } } then if found != null { first start; then assign dual_request_pools := (dual_request_pools, p); then assign dual_requests := (dual_requests,'#multiset_random'(found)); then done; } then done; } then if dual_requests != null { // randomly select one of the possible dual requests attribute x : Positive := '#bound_random'(low=1, high=size(dual_requests)); first start; then assign dual_request_pool:= dual_request_pools#(x); then assign dual_request:= dual_requests#(x); } else { first start; then assign dual_request_pool:= null; then assign dual_request:= null; } then done; } // SUBROUTINE add an acknowledgement for a couple (pool,request) action acknowledge { doc /* add acknowledgement in mails_to_send and pools_to_remove */ in request_pool : '#RequestPool'; in request : '#Request'; first start; then assign mails_to_send := ( mails_to_send, '#Mail'( recipient = request_pool.requestor, response = '#Response'('@index' = request.'@index', '@payload' = '#ack_message') ) ); then assign pools_to_remove := (pools_to_remove, request_pool); } // SEARCH A SYNCHRONIZATION FOR request ============================== action search_request_synchronization { doc /* search a synchronization for current request and updates mails_to_send * and pools_to_remove accordingly. If no synchronization is found, both * variables are set to null. */ first start; then assign mails_to_send := null; then assign pools_to_remove := null; then decide select_wrt_request_type; if request hastype '#NOKrequest' then done; if request hastype '#TrivialRequest' then trivial_synchro; else communication_synchro; // Synchronization of Trivialial actions -------------------------------- action trivial_synchro subsets acknowledge { doc /* Trivialial actions are always synchronized (with no other pool) and simply return * an acknowledgment */ in request_pool redefines request_pool; in request redefines request; } then done; // end of trivial_synchro : search_request_synchronization returns // Synchronization of communication requests, depends on channel type ---- item communication_request[0..1] : '#CommunicationRequest' := request as '#CommunicationRequest'; decide communication_synchro; if communication_request.'@channel' hastype '#Fifo' then fifo_synchro; else sync_synchro; // Synchronization of communication requests on fifo channel ------------ action fifo_synchro { item fifo : '#Fifo' := communication_request.'@channel' as '#Fifo'; first start; then decide select_wrt_communication_direction; if request hastype '#SendRequest' then send_synchro; else receive_synchro; action send_synchro { item send_request : '#SendRequest' := communication_request as '#SendRequest'; first start; then decide test_if_lossy; if fifo.'@relation'.'@lossy' == true then lossy_synchro; else noloss_synchro; decide lossy_synchro; // loss or not, random choice then loss_synchro; then noloss_synchro; action loss_synchro subsets acknowledge { in request_pool redefines request_pool = request_pool; in request redefines request = request; } then done; action noloss_synchro { first start; then decide test_if_full; if fifo.can_put() then put_synchro; else full_synchro; decide full_synchro; // test if blocking if fifo hastype '#NBfifo' then discard_synchro; else done; action discard_synchro subsets acknowledge { in request_pool redefines request_pool = request_pool; in request redefines request = request; } then done; action put_synchro { first start; then action put subsets fifo.put { in 'message' redefines 'message' = send_request.'@payload'; } then action ack subsets acknowledge { in request_pool redefines request_pool = request_pool; in request redefines request = request; } then perform search_pending_dual_request; then if dual_request != null { first start; then assign pools_to_remove := (pools_to_remove,dual_request_pool); then assign mails_to_send := (mails_to_send, '#Mail'( recipient = dual_request_pool.requestor, response = '#Response'( '@index' = dual_request.'@index', '@payload' = fifo.get() ))); then done; } } then done; } then done; } then done; // end of send_synchro : fifo_synchro returns action receive_synchro { first start; then if fifo.can_get() { first start; then assign mails_to_send := '#Mail'( recipient = request_pool.requestor, response = '#Response'('@index' = communication_request.'@index', '@payload' = fifo.get() )); then assign pools_to_remove := request_pool; then perform search_pending_dual_request; then if dual_request != null { first start; then assign pools_to_remove := (pools_to_remove,dual_request_pool); then action ack subsets acknowledge { in request_pool redefines request_pool = dual_request_pool; in request redefines request = dual_request; } then done; } then done; } then done; } then done; // end of receive_synchro : fifo_synchro returns } then done; // end of fifo_synchro : search_request_synchronization returns // Synchronization of communication requests on synchroneous channel ------------ action sync_synchro { first start; then decide select_wrt_communication_direction; if communication_request hastype '#SendRequest' then send_synchro; else receive_synchro; action send_synchro { first start; then perform search_pending_dual_request; then if dual_request != null { item send_request : '#SendRequest' = request as '#SendRequest'; first start; then action ack subsets acknowledge { in request_pool redefines request_pool = request_pool; in request redefines request = request; } then assign mails_to_send := ( mails_to_send, '#Mail'( recipient = dual_request_pool.requestor, response = '#Response'( '@index' = dual_request.'@index', '@payload' = send_request.'@payload' )) ); then assign pools_to_remove := (pools_to_remove, dual_request_pool); then done; } then done; } then done; // end of send_synchro : sync_synchro returns action receive_synchro { first start; then perform search_pending_dual_request; then if dual_request != null { item send_request : '#SendRequest' = dual_request as '#SendRequest'; first start; then action ack subsets acknowledge { in request_pool redefines request_pool = dual_request_pool; in request redefines request = dual_request; } then assign mails_to_send := ( mails_to_send, '#Mail'( recipient = request_pool.requestor, response = '#Response'( '@index' = request.'@index', '@payload' = send_request.'@payload' )) ); then assign pools_to_remove := (pools_to_remove, request_pool); then done; } then done; } then done; // end of receive_synchro : sync_synchro returns } then done; // end of sync_synchro : search_request_synchronization returns } // end of search_request_synchronization } }