Newer
Older
// AVATAR CENTRALIZED TRANSITION SERVER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
package AvatarTransitionServer {
Sophie Coudert
committed
private import Clocks::TimeOf;
private import Time::TimeInstantValue;
private import AvatarGeneral::*;
private import AvatarCommunication::*;
public import Occurrences::Occurrence;
doc /* The server is used to select a transition to execute among the possible outgoing transitions
* of a state of an Avatar block statemachine. For this, states will submit pools of requests to
* the server, where each request is associated to one of its outgoing transitions. Then, as soon
* as some requests in the pool are feasible, the server chose one of them and send a response to
* the state identifying the selected request and providing the associated '@payload' (transmitted
* message for receiving requests, and acknowledgment in other cases)
*/
// AVATAR TRANSITION REQUESTS ============================================
abstract part def '#Request' {
doc /* The most general request type. Outgoing transition from a state are statically ordered.
* Thus associated requests have an index w.r.t this order, which identifies the request they
* are associated to.
*/
readonly attribute '@index' : Positive default 1;
comment Request_Partition about '#OKrequest', '#NOKrequest', '#Request'
/* Requests are either certainly unfeasible (a #NOKrequest has false guard)
* or potentially feasible (an #OKrequest has true guard)
* #OKrequest, #NOKrequest is a partition of #Request
*/
}
part def '#NOKrequest' :> '#Request' {
doc /* Certainly unfeasible requests, i.e. requests with false guard.
* they cannot be selected by the centralized server.
*/
}
abstract part def '#OKrequest' :> '#Request' {
doc /* Potentially feasible requests, i.e. requests with true guard.
* they may be selected by the centralized server.
* They may be '@delay'ed by an "after", thus they have a trigger date w.r.t
* the system discrete clock.
*/
item localClock : Clock redefines localClock = '#systemClock' ;
readonly attribute '@delay' : Positive default 0;
derived attribute trigger_at = TimeOf(clock=localClock, o=self) + '@delay';
comment OKrequest_Partition about '#OKrequest', '#TrivialRequest', '#CommunicationRequest'
/* OK requests may be without dependency and thus Trivialially feasible (#TrivialRequest).
* Non Trivialial requests are the ones associated to communication (#CommunicationRequest).
* They may require synchronization, or channel or data to be available.
* #TrivialRequest, #CommunicationRequest, is a partition of #OKrequest
*/
}
part def '#TrivialRequest' :> '#OKrequest'{
doc /* Trivialial requests don't communicate, thus don't block. Associated transitions
* only execute actions such as assignment, external method calls,...
*/
}
item '#immediate_request' : '#TrivialRequest' = '#TrivialRequest'();
abstract part def '#CommunicationRequest' :> '#OKrequest' {
doc /* Communication requests, which have an associated communication channel.
* They may block until some ressource is available, depending of the kind of channel
*/
readonly item '@channel' : '#Channel';
readonly attribute is_send : Boolean {
doc /* request direction, which is redoundant with #CommunicationRequest
* specializations but useful to factor some descriptions in the model
*/
}
comment about '#CommunicationRequest', '#SendRequest', '#ReceiveRequest'
/* communication requests have a direction: they are "send" or "receive".
* #SendRequest, #ReceiveRequest, is a partition of #CommunicationRequest
*/
}
part def '#SendRequest' :> '#CommunicationRequest' {
doc /* Requests for sending. They have have a '@payload' (message) */
readonly item '@payload' : '#Message';
readonly attribute is_send redefines is_send = true;
}
part def '#ReceiveRequest' :> '#CommunicationRequest' {
doc /* Requests for receiving */
readonly attribute is_send redefines is_send = false;
}
part def '#RequestPool' {
doc /* Pools of requests. They contain one request per outgoing transition of the
* state they are associated to. They have an associated requestor identifying
* who to send the response to (generally the block whose statemachine contains
* the state)
*/
occurrence requestor : Occurrence;
item pool : '#Request'[0..*] ordered default := '#immediate_request';
assert constraint {
doc /* Request indexes correspond to their range in the pool. This is a redoundancy which
* makes the model more simple.
* Moreover, all transitions from the concerned state in Avatar model have a corresponding
* request , which cannot be expressed here as it is a constraint on the software which
* generates the Avatar SysMLV2 model
*/
calc test {
first start;
then assign result := true;
then for i in 1..size(pool) {
first start;
then assign result := result & pool#(i).'@index' == i;
}
}
test()
}
}
// AVATAR RESPONSES TO REQUESTS ============================================
item def '#Response' {
doc /* Response to submitted request pool.
* Once a request in a pool is satisfied, the server sends a response to
* the requestor associated to the pool. This response identifies the
* (index of the transition corresponding to the) satisfied request in
* the pool and carry some '@payload'. A '@payload' is a message (which can be
* a simple acknowledgement).
* Remark: As a resquestor should not have more than one pending request
* pool in the server, this information is sufficient to root the response
* to a unique well identified waiting accept action.
*/
attribute '@index' : Positive default 1;
item '@payload' : '#Message' default '#ack_message';
}
item '#default_response' = '#Response'();
// THE SERVER ITSELF ========================================================
part '#transitionRequestServer' {
doc /* The transition request server.
* General Principle: all submitted requests are initially considered as
* '@delay'ed (even if their '@delay' is 0, for homogeneity). '@delay'ed requests
* wake up themself at their "trigger_at" date and launch the server's
* request handler on themself. The handler try to execute the request
* which may lead either to successfull or blocking issue.
* A request wake up once and only once. After this, if it is blocked
* (pending communication), it remains in the pending requests but is
* removed from the '@delay'ed requests. Such requests may then be unblocked
* by the wake-up of a dual request (a send for a receive or a receive for
* a send), or discarded if some other request in their pool is selected
* to be executed. Pools may also wait forever...
*
* The server maintains two sets of pending pools:
* - general pending pools are all pending pools, containing all
* pending requests
* - '@delay'ed pending pools (each one associated to a general pending pools)
* are pools containing '@delay'ed requests, and only '@delay'ed requests.
*
* When a pool is submitted by a state, it is added to both sets.
* When a '@delay'ed request wake up, it is removed from its '@delay'ed pool (which
* is removed when becomming empty -- facultatif) and if handling
* this request leads to a success, all involved pools are removed from
* both sets and relevant responses are send to the involved communicating
* ends (which pools and which responses depends on channel types).
*/
item localClock : Clock redefines localClock = '#systemClock' ;
private individual item def Kill { doc /* type of the local kill signal */ }
private individual item kill : Kill {
doc /* The local kill signal to be sent to '@delay'ed requests when
* their pool is deleted because some other request in the pool
* is executed
*/
}
private item mutex : '#Lock';
private item request_pools : '#RequestPool'[0..*] ordered {
doc /* general pending pools containing requests waiting for wake up
* or available ressources
*/
}
protected item delayed_pools : '#delayedPool'[0..*] ordered {
doc /* '@delay'ed pending pools containing requests waiting for wake up */
part def '#delayedPool' {
doc /* Type of pools containing '@delay'ed requests. Such pools are
* associated to the general request pool that contain their requests.
*/
item request_pool : '#RequestPool';
item delayed_requests : '#delayedRequest'[0..*] ordered;
part def '#delayedRequest' {
doc /* (operational) structure for '@delay'ed requests */
attribute trigger_at : Positive, TimeInstantValue { doc /* The date at which to wake up */ }
item delayed_pool : '#delayedPool' { doc /* the '@delay'ed pool the request belongs to */}
attribute '@index' : Positive;
private readonly item myself = self; // technical... to be referenced in sub-components
protected item delayed_pools : '#delayedPool'[0..*] ordered = '#transitionRequestServer'.delayed_pools;
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
// apparently required !?!
state statemachine {
doc /* waits for wake up or kill */
entry; then state wait;
transition first wait accept Kill then done;
transition first wait accept at trigger_at
do action {
perform action remove_request subsets
'#multiset_remove' {
in multiset redefines multiset = delayed_pool.delayed_requests;
in pred redefines pred = '#identity'(myself);
}
then if delayed_pool.delayed_requests == null {
perform action remove_delayed_pool subsets
'#multiset_remove' {
in multiset redefines multiset = delayed_pools;
in pred redefines pred = '#identity'(delayed_pool);
} then done;
}
then action wake_up subsets '#transitionRequestServer'.request_wake_up {
in request_pool redefines request_pool = delayed_pool.request_pool;
in transition_id redefines transition_id = transition_id;
}
}
then done;
}
first statemachine then done;
}
// FOR SUBMITTING A REQUEST POOL ==========================================
action submit { // add parameter pool to request_pools and delayed_pools
in request_pool : '#RequestPool';
part delayed_pool : '#delayedPool' {
item request_pool : '#RequestPool' redefines request_pool = request_pool;
item delayed_requests : '#delayedRequest'[0..*] redefines delayed_requests := null;
}
first start; then perform mutex.lock;
for i in 1..size(request_pool.pool) {
if request_pool.pool#(i) hastype '#OKrequest' { // build '@delay'ed request for ith request
item request := request_pool.pool#(i) as '#OKrequest';
part delayed_request : '#delayedRequest' {
in trigger_at redefines trigger_at = request.trigger_at;
in delayed_pool redefines delayed_pool = delayed_pool;
in '@index' redefines '@index' = request.'@index';
first start; then // add '@delay'ed request
assign delayed_pool.delayed_requests := (delayed_pool.delayed_requests,delayed_request);
then delayed_request.start;
}
}
then assign request_pools := (request_pools, request_pool); // add general request pool
then if delayed_pools.delayed_requests != null {
assign delayed_pools := (delayed_pools, delayed_pool);
} // add '@delay'ed request pool if not empty
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
then perform mutex.unlock;
then done;
}
// REQUEST WAKE UP: tries request synchronization if request has not already been handled
comment about request, request_pool /* ALGORITHMIC VARIABLES: inputs for trying a synchronization */
protected part request : '#Request'{ doc /* ALGORITHMIC VARIABLE : the request to be synchronized */ }
protected part request_pool : '#RequestPool'{ doc /* tALGORITHMIC VARIABLE :he pool of the request to be synchronized */ }
action request_wake_up {
doc /* verifies that request has not already been synchronized and if not, try to synchronize it */
in request_pool : '#RequestPool';
in transition_id : Positive;
first start; then perform mutex.lock;
then perform action todo subsets '#multiset_exists'{
in multiset redefines multiset = request_pools;
in pred redefines pred = '#identity'(request_pool); }
then if todo.result {
first start;
then assign request := request_pool#(transition_id);
then assign request_pool := request_pool;
then perform try_request_synchronization;
}
then perform mutex.unlock;
then done;
}
action try_request_synchronization {
first start;
then perform search_request_synchronization;
then perform remove_request_pools;
then perform send_mails;
then done;
}
// SUBROUTINE : removing synchronized pending pools =========================
item pools_to_remove : '#RequestPool'[0..2] {
doc /* ALGORITHMIC VARIABLE: synchronized pools, i.e associated to synchronized
* requests. Updated when finding a synchronization. Removed from pending pools
* when executing this synchronization.
*/
}
protected action remove_request_pools {
doc /* Removing pools in pools_to_remove, i.e removing them from both sets of
* pending pools (general and '@delay'ed). This also kills the process in '@delay'ed
* requests which are waiting for either a trigger signal or a kill signal.
*/
// subroutine
action kill_delayed_pool_requests : '#Unary_action' {
doc /* kill all waiting requests of a parameter '@delay'ed pool */
in parameter : '#delayedPool';
first start;
then for r in parameter.delayed_requests { first start; then send kill to r; }
}
first start;
then for p : '#RequestPool' in pools_to_remove {
// subroutine
calc has_request_pool : '#Unary_predicate' {
doc /* test if the request pool of a parameter '@delay'ed pool is p */
in parameter : '#delayedPool'; parameter.request_pool == p
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
}
first start;
then action remove_request_pool subsets '#multiset_remove' { // remove pool from general pending pools
in multiset redefines multiset = request_pools;
in pred redefines pred = '#identity'(p);
}
then action remove_delayed_pool subsets '#multiset_iter' {
in multiset redefines multiset = delayed_pools;
in pred redefines pred = has_request_pool;
in if_act redefines if_act = kill_delayed_pool_requests;
}
succession flow remove_delayed_pool.discarded to update_delayed_pools.new_delayed_pools;
action update_delayed_pools {
in new_delayed_pools : '#RequestPool'[0..*];
first start; then assign delayed_pools := new_delayed_pools; then done;
}
}
then done;
}
// SUBROUTINE: sending responses to requestors =============================
protected part def '#Mail' {
doc /* Response computed while finding synchronizations have an associated recipient,
* This characterizes mails.
*/
item recipient: Occurrence;
item response: '#Response';
}
item mails_to_send : '#Mail'[0..2] {
doc /* ALGORITHMIC VARIABLE: mails associated to synchronized requests. Updated
* when finding a synchronization. Responses are sent to recipients when executing
* this synchronization.
*/
}
protected action send_mails {
doc /* send responses to requestors of synchronized requests */
first start;
then for i : Positive in 1..size(mails_to_send) {
first start;
then send mails_to_send#(i).response to mails_to_send#(i).recipient;
then done;
}
}
// SUBROUTINE searching a dual request for current request in pending requests ===============
private part dual_request : '#Request'[0..1]{
doc /* selected dual pending request of the current request,
* among the found ones
*/
}
private part dual_request_pool : '#RequestPool'[0..1] {
doc /* ALGORITHMIC VARIABLE: the pool containing the selected dual pending
* request of the current request.
*/
}
action search_pending_dual_request {
doc /* search for a dual request of current request in request_pools.
* updates dual_request and dual_request_pool accordingly. If no dual request
* is found, they are set to null. If severall are found, one is choosed
* randomly.
* Dual requests are communication requests on a same channel with opposite
* directions
*/
comment about dual_requests, dual_request_pools
/* order in these two items are parallel: i-th request belongs to i-th pool */
part dual_requests : '#Request'[0..*] := null{
doc /* found requests that are dual of current request */
}
part dual_request_pools : '#RequestPool'[0..*] := null {
doc /* the pools containing dual requests in dual_requests, in the same order */
}
first start;
then assign dual_requests := null;
then assign dual_request_pools := null;
then decide;
if request hastype '#CommunicationRequest' then search;
else done;
item communication_request[0..1] : '#CommunicationRequest' := request as '#CommunicationRequest';
action search assign dual_request := null;
then assign dual_request_pool := null;
then for p : '#RequestPool' in request_pools {
item found : '#Request'[0..*] := null; // dual requests in pool p
first start;
then for r : '#Request' in p.pool {
if r hastype '#CommunicationRequest' {
item req = r as '#CommunicationRequest';
first start;
then if communication_request.'@channel' == req.'@channel' and
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
communication_request.is_send != req.is_send and
communication_request.trigger_at >= localClock.currentTime
{ first start; then assign found := (found,r); then done; }
}
}
then if found != null {
first start;
then assign dual_request_pools := (dual_request_pools, p);
then assign dual_requests := (dual_requests,'#multiset_random'(found));
then done;
}
then done;
}
then if dual_requests != null { // randomly select one of the possible dual requests
attribute x : Positive := '#bound_random'(low=1, high=size(dual_requests));
first start;
then assign dual_request_pool:= dual_request_pools#(x);
then assign dual_request:= dual_requests#(x);
}
else {
first start;
then assign dual_request_pool:= null;
then assign dual_request:= null;
}
then done;
}
// SUBROUTINE add an acknowledgement for a couple (pool,request)
action acknowledge {
doc /* add acknowledgement in mails_to_send and pools_to_remove */
in request_pool : '#RequestPool';
in request : '#Request';
first start;
then assign mails_to_send := ( mails_to_send,
'#Mail'( recipient = request_pool.requestor,
response = '#Response'('@index' = request.'@index',
'@payload' = '#ack_message') ) );
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
then assign pools_to_remove := (pools_to_remove, request_pool);
}
// SEARCH A SYNCHRONIZATION FOR request ==============================
action search_request_synchronization {
doc /* search a synchronization for current request and updates mails_to_send
* and pools_to_remove accordingly. If no synchronization is found, both
* variables are set to null.
*/
first start;
then assign mails_to_send := null;
then assign pools_to_remove := null;
then decide select_wrt_request_type;
if request hastype '#NOKrequest' then done;
if request hastype '#TrivialRequest' then trivial_synchro;
else communication_synchro;
// Synchronization of Trivialial actions --------------------------------
action trivial_synchro subsets acknowledge {
doc /* Trivialial actions are always synchronized (with no other pool) and simply return
* an acknowledgment
*/
in request_pool redefines request_pool;
in request redefines request;
}
then done; // end of trivial_synchro : search_request_synchronization returns
// Synchronization of communication requests, depends on channel type ----
item communication_request[0..1] : '#CommunicationRequest' := request as '#CommunicationRequest';
decide communication_synchro;
if communication_request.'@channel' hastype '#Fifo' then fifo_synchro;
else sync_synchro;
// Synchronization of communication requests on fifo channel ------------
action fifo_synchro {
item fifo : '#Fifo' := communication_request.'@channel' as '#Fifo';
first start;
then decide select_wrt_communication_direction;
if request hastype '#SendRequest' then send_synchro;
else receive_synchro;
action send_synchro {
item send_request : '#SendRequest' := communication_request as '#SendRequest';
first start;
then decide test_if_lossy;
if fifo.'@relation'.'@lossy' == true then lossy_synchro;
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
else noloss_synchro;
decide lossy_synchro; // loss or not, random choice
then loss_synchro;
then noloss_synchro;
action loss_synchro subsets acknowledge {
in request_pool redefines request_pool = request_pool;
in request redefines request = request;
}
then done;
action noloss_synchro {
first start;
then decide test_if_full;
if fifo.can_put() then put_synchro;
else full_synchro;
decide full_synchro; // test if blocking
if fifo hastype '#NBfifo' then discard_synchro;
else done;
action discard_synchro subsets acknowledge {
in request_pool redefines request_pool = request_pool;
in request redefines request = request;
}
then done;
action put_synchro {
first start;
then action put subsets fifo.put {
in 'message' redefines 'message' = send_request.'@payload';
}
then action ack subsets acknowledge {
in request_pool redefines request_pool = request_pool;
in request redefines request = request;
}
then perform search_pending_dual_request;
then
if dual_request != null {
first start;
then assign pools_to_remove := (pools_to_remove,dual_request_pool);
then assign mails_to_send := (mails_to_send,
'#Mail'( recipient = dual_request_pool.requestor,
response = '#Response'( '@index' = dual_request.'@index',
'@payload' = fifo.get() )));
then done;
}
} then done;
} then done;
} then done; // end of send_synchro : fifo_synchro returns
action receive_synchro {
first start;
then if fifo.can_get() {
first start;
then assign mails_to_send :=
'#Mail'( recipient = request_pool.requestor,
response = '#Response'('@index' = communication_request.'@index',
'@payload' = fifo.get() ));
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
then assign pools_to_remove := request_pool;
then perform search_pending_dual_request;
then
if dual_request != null {
first start;
then assign pools_to_remove := (pools_to_remove,dual_request_pool);
then action ack subsets acknowledge {
in request_pool redefines request_pool = dual_request_pool;
in request redefines request = dual_request;
} then done;
} then done;
} then done;
} then done; // end of receive_synchro : fifo_synchro returns
} then done; // end of fifo_synchro : search_request_synchronization returns
// Synchronization of communication requests on synchroneous channel ------------
action sync_synchro {
first start;
then decide select_wrt_communication_direction;
if communication_request hastype '#SendRequest' then send_synchro;
else receive_synchro;
action send_synchro {
first start;
then perform search_pending_dual_request;
then
if dual_request != null {
item send_request : '#SendRequest' = request as '#SendRequest';
first start;
then action ack subsets acknowledge {
in request_pool redefines request_pool = request_pool;
in request redefines request = request;
}
then assign mails_to_send := ( mails_to_send,
'#Mail'( recipient = dual_request_pool.requestor,
response = '#Response'(
'@index' = dual_request.'@index',
'@payload' = send_request.'@payload' )) );
then assign pools_to_remove := (pools_to_remove, dual_request_pool);
then done;
} then done;
} then done; // end of send_synchro : sync_synchro returns
action receive_synchro {
first start;
then perform search_pending_dual_request;
then
if dual_request != null {
item send_request : '#SendRequest' = dual_request as '#SendRequest';
first start;
then action ack subsets acknowledge {
in request_pool redefines request_pool = dual_request_pool;
in request redefines request = dual_request;
}
then assign mails_to_send := ( mails_to_send,
'#Mail'( recipient = request_pool.requestor,
response = '#Response'(
'@index' = request.'@index',
'@payload' = send_request.'@payload' )) );
then assign pools_to_remove := (pools_to_remove, request_pool);
then done;
} then done;
} then done; // end of receive_synchro : sync_synchro returns
} then done; // end of sync_synchro : search_request_synchronization returns
} // end of search_request_synchronization
}
}