From 59a8c6dcf4d5fde00f537c3c12ed9af5e47f7e77 Mon Sep 17 00:00:00 2001 From: Ludovic Apvrille <ludovic.apvrille@telecom-paristech.fr> Date: Thu, 19 May 2011 12:14:31 +0000 Subject: [PATCH] AVATAR code generator: support of asynchronous channels --- executablecode/Makefile.src | 2 +- executablecode/src/asyncchannel.c | 35 +++++++++ executablecode/src/asyncchannel.h | 6 +- executablecode/src/message.c | 8 ++- executablecode/src/message.h | 9 +-- executablecode/src/request.c | 9 +++ executablecode/src/request_manager.c | 102 +++++++++++++++++++++++++++ executablecode/src/tracemanager.c | 7 ++ 8 files changed, 164 insertions(+), 14 deletions(-) diff --git a/executablecode/Makefile.src b/executablecode/Makefile.src index b2ad698f79..da5a5db63a 100755 --- a/executablecode/Makefile.src +++ b/executablecode/Makefile.src @@ -1 +1 @@ -SRCS = generated_src/main.c generated_src/System.c generated_src/Receiver2.c generated_src/Receiver1.c generated_src/Sender.c \ No newline at end of file +SRCS = generated_src/main.c generated_src/Generator.c generated_src/TestBlock.c \ No newline at end of file diff --git a/executablecode/src/asyncchannel.c b/executablecode/src/asyncchannel.c index ace908b263..e2ce2ac48a 100644 --- a/executablecode/src/asyncchannel.c +++ b/executablecode/src/asyncchannel.c @@ -1,5 +1,6 @@ #include <stdlib.h> +#include "message.h" #include "asyncchannel.h" #include "myerrors.h" @@ -13,9 +14,43 @@ asyncchannel *getNewAsyncchannel(char *outname, char *inname, int isBlocking, in asyncch->outname = outname; asyncch->isBlocking = isBlocking; asyncch->maxNbOfMessages = maxNbOfMessages; + return asyncch; } void destroyAsyncchannel(asyncchannel *asyncch) { free(asyncch); } + +message* getAndRemoveOldestMessageFromAsyncChannel(asyncchannel *channel) { + message *msg; + message *previous; + + if (channel->currentNbOfMessages == 0) { + return NULL; + } + + if (channel->currentNbOfMessages == 1) { + channel->currentNbOfMessages = 0; + msg = channel->pendingMessages; + channel->pendingMessages = NULL; + return msg; + } + + msg = channel->pendingMessages; + previous = msg; + while(msg->next != NULL) { + previous = msg; + msg = msg->next; + } + + channel->currentNbOfMessages = channel->currentNbOfMessages -1; + previous->next = NULL; + return msg; +} + +void addMessageToAsyncChannel(asyncchannel *channel, message *msg) { + msg->next = channel->pendingMessages; + channel->pendingMessages = msg; + channel->currentNbOfMessages = channel->currentNbOfMessages+1; +} diff --git a/executablecode/src/asyncchannel.h b/executablecode/src/asyncchannel.h index f2d6e32bce..8f812359db 100644 --- a/executablecode/src/asyncchannel.h +++ b/executablecode/src/asyncchannel.h @@ -14,13 +14,15 @@ struct asyncchannel { int maxNbOfMessages; // struct request* outWaitQueue; struct request* inWaitQueue; - setOfMessages *pendingMessages; + message *pendingMessages; + int currentNbOfMessages; }; typedef struct asyncchannel asyncchannel; asyncchannel *getNewAsyncchannel(char *inname, char *outname, int isBlocking, int maxNbOfMessages); void destroyAsyncchannel(asyncchannel *syncch); - +message* getAndRemoveOldestMessageFromAsyncChannel(asyncchannel *channel); +void addMessageToAsyncChannel(asyncchannel *channel, message *msg); #endif diff --git a/executablecode/src/message.c b/executablecode/src/message.c index a2078cb3e7..bea6d5cc4c 100644 --- a/executablecode/src/message.c +++ b/executablecode/src/message.c @@ -24,13 +24,15 @@ message *getNewMessage(int nbOfParams, int *params) { criticalError("Allocation of request failed"); } msg->nbOfParams = nbOfParams; - msg->params = params; + msg->params = params; return msg; } + + void destroyMessageWithParams(message *msg) { - free(msg->params); - free(msg); + free(msg->params); + free(msg); } void destroyMessage(message *msg) { diff --git a/executablecode/src/message.h b/executablecode/src/message.h index 8aba1b84d7..59b99e8c90 100644 --- a/executablecode/src/message.h +++ b/executablecode/src/message.h @@ -2,15 +2,8 @@ #define MESSAGE_H -struct message; - -struct setOfMessages { - struct message *head; -}; - -typedef struct setOfMessages setOfMessages; - struct message { + struct message *next; int nbOfParams; int *params; }; diff --git a/executablecode/src/request.c b/executablecode/src/request.c index 0e495195ed..5d5cd0e37f 100644 --- a/executablecode/src/request.c +++ b/executablecode/src/request.c @@ -24,6 +24,7 @@ request *getNewRequest(int ID, int type, int hasDelay, long minDelay, long maxDe // Delays are in microseconds void makeNewRequest(request *req, int ID, int type, int hasDelay, long minDelay, long maxDelay, int nbOfParams, int **params) { long delay; + int i; req->next = NULL; req->listOfRequests = NULL; @@ -46,6 +47,14 @@ void makeNewRequest(request *req, int ID, int type, int hasDelay, long minDelay, req->relatedRequest = NULL; + if (type == SEND_ASYNC_REQUEST) { + // Must create a new message + req->msg = getNewMessageWithParams(nbOfParams); + for(i=0; i<nbOfParams; i++) { + req->msg->params[i] = *(params[i]); + } + } + } diff --git a/executablecode/src/request_manager.c b/executablecode/src/request_manager.c index a9f52fa6a8..e39896f260 100644 --- a/executablecode/src/request_manager.c +++ b/executablecode/src/request_manager.c @@ -8,6 +8,7 @@ #include "debug.h" #include "mytimelib.h" #include "random.h" +#include "asyncchannel.h" @@ -93,6 +94,49 @@ void executeReceiveSyncTransaction(request *req) { } +void executeSendAsyncTransaction(request *req) { + request *selectedReq; + + // Full FIFO? + if (req->asyncChannel->currentNbOfMessages == req->asyncChannel->maxNbOfMessages) { + // Must remove the oldest message + getAndRemoveOldestMessageFromAsyncChannel(req->asyncChannel); + } + + addMessageToAsyncChannel(req->asyncChannel, req->msg); + + debugMsg("Signaling async write to all requests waiting "); + selectedReq = req->asyncChannel->inWaitQueue; + while (selectedReq != NULL) { + pthread_cond_signal(selectedReq->listOfRequests->wakeupCondition); + selectedReq = selectedReq->next; + } +} + +void executeReceiveAsyncTransaction(request *req) { + int i; + request *selectedReq; + + req->msg = getAndRemoveOldestMessageFromAsyncChannel(req->asyncChannel); + + debugMsg("Signaling async read to all requests waiting "); + selectedReq = req->asyncChannel->outWaitQueue; + + // Must recopy parameters + for(i=0; i<req->nbOfParams; i++) { + *(req->params[i]) = req->msg->params[i]; + } + + // unallocate message + destroyMessageWithParams(req->msg); + + while (selectedReq != NULL) { + pthread_cond_signal(selectedReq->listOfRequests->wakeupCondition); + selectedReq = selectedReq->next; + } +} + + void executeSendBroadcastTransaction(request *req) { int cpt; request *tmpreq; @@ -222,6 +266,41 @@ int executable(setOfRequests *list, int nb) { //index ++; } + if (req->type == SEND_ASYNC_REQUEST) { + debugMsg("Send async"); + + if (!(req->asyncChannel->isBlocking)) { + // Can always add a message -> executable + debugMsg("Send async executable since non blocking"); + req->executable = 1; + cpt ++; + + //blocking case ... channel full? + } else { + if (req->asyncChannel->currentNbOfMessages < req->asyncChannel->maxNbOfMessages) { + // Not full! + debugMsg("Send async executable since channel not full"); + req->executable = 1; + cpt ++; + } else { + debugMsg("Send async not executable: full, and channel is blocking"); + } + } + } + + if (req->type == RECEIVE_ASYNC_REQUEST) { + debugMsg("receive async"); + if (req->asyncChannel->currentNbOfMessages >0) { + debugMsg("Receive async executable: not empty"); + req->executable = 1; + cpt ++; + } else { + debugMsg("Receive async not executable: empty"); + } + //index ++; + } + + if (req->type == SEND_BROADCAST_REQUEST) { debugMsg("send broadcast"); req->executable = 1; @@ -234,6 +313,8 @@ int executable(setOfRequests *list, int nb) { req->executable = 0; //index ++; } + + if (req->type == IMMEDIATE) { @@ -259,12 +340,25 @@ void private__makeRequestPending(setOfRequests *list) { req->syncChannel->outWaitQueue = addToRequestQueue(req->syncChannel->outWaitQueue, req); req->alreadyPending = 1; } + if (req->type == RECEIVE_SYNC_REQUEST) { debugMsg("Adding pending request in inWaitqueue"); req->alreadyPending = 1; req->syncChannel->inWaitQueue = addToRequestQueue(req->syncChannel->inWaitQueue, req); } + if (req->type == SEND_ASYNC_REQUEST) { + debugMsg("Adding pending request in outWaitqueue"); + req->asyncChannel->outWaitQueue = addToRequestQueue(req->asyncChannel->outWaitQueue, req); + req->alreadyPending = 1; + } + + if (req->type == RECEIVE_ASYNC_REQUEST) { + debugMsg("Adding pending request in inWaitqueue"); + req->alreadyPending = 1; + req->asyncChannel->inWaitQueue = addToRequestQueue(req->asyncChannel->inWaitQueue, req); + } + if (req->type == RECEIVE_BROADCAST_REQUEST) { debugMsg("Adding pending broadcast request in inWaitqueue"); req->alreadyPending = 1; @@ -292,6 +386,14 @@ void private__makeRequest(request *req) { executeReceiveSyncTransaction(req); } + if (req->type == SEND_ASYNC_REQUEST) { + executeSendAsyncTransaction(req); + } + + if (req->type == RECEIVE_ASYNC_REQUEST) { + executeReceiveAsyncTransaction(req); + } + if (req->type == SEND_BROADCAST_REQUEST) { executeSendBroadcastTransaction(req); } diff --git a/executablecode/src/tracemanager.c b/executablecode/src/tracemanager.c index b06dbaeb12..3befa1d411 100644 --- a/executablecode/src/tracemanager.c +++ b/executablecode/src/tracemanager.c @@ -137,6 +137,13 @@ void traceRequest(char *myname, request *req) { case RECEIVE_SYNC_REQUEST: sprintf(s, "block=%s type=receive_synchro channel=%s\n", myname, req->syncChannel->inname); break; + case SEND_ASYNC_REQUEST: + debug2Msg("Async channel", req->asyncChannel->outname); + sprintf(s, "block=%s type=send_async channel=%s\n", myname, req->asyncChannel->outname); + break; + case RECEIVE_ASYNC_REQUEST: + sprintf(s, "block=%s type=receive_async channel=%s\n", myname, req->asyncChannel->inname); + break; case SEND_BROADCAST_REQUEST: debug2Msg("Sync channel", req->syncChannel->outname); sprintf(s, "block=%s type=send_broadcast channel=%s\n", myname, req->syncChannel->outname); -- GitLab