From 482e266566ba42316e4561967df716ab3888d648 Mon Sep 17 00:00:00 2001 From: Daniela Genius <Daniela.Genius@lip6.fr> Date: Fri, 8 Jan 2016 08:46:11 +0000 Subject: [PATCH] merged --- executablecode/src_soclib/asyncchannel.c | 128 +++++ executablecode/src_soclib/asyncchannel.h | 29 + executablecode/src_soclib/debug.c | 87 +++ executablecode/src_soclib/debug.h | 19 + executablecode/src_soclib/defs.h | 9 + executablecode/src_soclib/message.c | 69 +++ executablecode/src_soclib/message.h | 22 + executablecode/src_soclib/myerrors.c | 24 + executablecode/src_soclib/myerrors.h | 11 + executablecode/src_soclib/mytimelib.c | 108 ++++ executablecode/src_soclib/mytimelib.h | 19 + executablecode/src_soclib/random.c | 43 ++ executablecode/src_soclib/random.h | 7 + executablecode/src_soclib/request.c | 272 +++++++++ executablecode/src_soclib/request.h | 89 +++ executablecode/src_soclib/request_manager.c | 583 ++++++++++++++++++++ executablecode/src_soclib/request_manager.h | 14 + executablecode/src_soclib/syncchannel.c | 112 ++++ executablecode/src_soclib/syncchannel.h | 30 + executablecode/src_soclib/tracemanager.c | 304 ++++++++++ executablecode/src_soclib/tracemanager.h | 21 + 21 files changed, 2000 insertions(+) create mode 100755 executablecode/src_soclib/asyncchannel.c create mode 100755 executablecode/src_soclib/asyncchannel.h create mode 100755 executablecode/src_soclib/debug.c create mode 100755 executablecode/src_soclib/debug.h create mode 100755 executablecode/src_soclib/defs.h create mode 100755 executablecode/src_soclib/message.c create mode 100755 executablecode/src_soclib/message.h create mode 100755 executablecode/src_soclib/myerrors.c create mode 100755 executablecode/src_soclib/myerrors.h create mode 100755 executablecode/src_soclib/mytimelib.c create mode 100755 executablecode/src_soclib/mytimelib.h create mode 100755 executablecode/src_soclib/random.c create mode 100755 executablecode/src_soclib/random.h create mode 100755 executablecode/src_soclib/request.c create mode 100755 executablecode/src_soclib/request.h create mode 100755 executablecode/src_soclib/request_manager.c create mode 100755 executablecode/src_soclib/request_manager.h create mode 100755 executablecode/src_soclib/syncchannel.c create mode 100755 executablecode/src_soclib/syncchannel.h create mode 100755 executablecode/src_soclib/tracemanager.c create mode 100755 executablecode/src_soclib/tracemanager.h diff --git a/executablecode/src_soclib/asyncchannel.c b/executablecode/src_soclib/asyncchannel.c new file mode 100755 index 0000000000..b46934f49d --- /dev/null +++ b/executablecode/src_soclib/asyncchannel.c @@ -0,0 +1,128 @@ +#include <stdlib.h> + +#include "message.h" +#include "asyncchannel.h" +#include "myerrors.h" +// ajoute DG +#include "mwmr.h" +//fin ajoute DG + + +//ajoute DG + +/* this function tries to read one message from the channel and returns either + 1 or 0*/ + +int async_read_nonblocking( struct mwmr_s *fifo, void *_ptr, int lensw ){ + int i; + i = mwmr_try_read(fifo,_ptr,lensw); + return i; +} + +/* this function tries to write one message to the channel and throws it away if unsuccessful */ + +int async_write_nonblocking( struct mwmr_s *fifo, void *_ptr, int lensw ){ + int i; + i = mwmr_try_write(fifo,_ptr,lensw); + if (i<lensw){ + /* the data item is thrown away */ + //printf("data thrown away"); + return i; + } + else{ + //printf("data transmitted"); + } + return i; +} + + + +void async_read( struct mwmr_s *fifo, void *_ptr, int lensw ){ + mwmr_read(fifo,_ptr,lensw); +} + +void async_write( struct mwmr_s *fifo, void *_ptr, int lensw ){ + mwmr_write(fifo,_ptr,lensw); + } + +//fin ajoute DG + +//DG 7.9. add MWMR as parameter +asyncchannel *getNewAsyncchannel(char *outname, char *inname, int isBlocking, int maxNbOfMessages, struct mwmr_s *fifo) { + asyncchannel * asyncch = (asyncchannel *)(malloc(sizeof(struct asyncchannel))); + if (asyncch == NULL) { + criticalError("Allocation of asyncchannel failed"); + } + asyncch->inname = inname; + asyncch->outname = outname; + asyncch->isBlocking = isBlocking; + asyncch->maxNbOfMessages = maxNbOfMessages; + //DG 7.9. add MWMR + // DG 08.12. ici bug : voir CAVE + asyncch->mwmr_fifo=fifo; + asyncch->mwmr_fifo->depth=fifo->depth; + asyncch->mwmr_fifo->width=fifo->width; +printf("asyncchannel getNew %x \n",asyncch->mwmr_fifo); +printf("asyncchannel %x \n",asyncch->mwmr_fifo->depth); +printf("asyncchannel %x \n",asyncch->mwmr_fifo->width); + + return asyncch; +} + +void destroyAsyncchannel(asyncchannel *asyncch) { + free(asyncch); +} + +/* DG il faut en meme temps gerer le manager central et les canaux MWMR */ + + message* getAndRemoveOldestMessageFromAsyncChannel(asyncchannel *channel) { + message *msg; + message *previous; + + if (channel->currentNbOfMessages == 0) { + return NULL; + } + + if (channel->currentNbOfMessages == 1) { + channel->currentNbOfMessages = 0; + msg = channel->pendingMessages;//DG 08.12. p.e. plus besoin de cela? + channel->pendingMessages = NULL; + + printf("£££££££££££££££££££££££££££\n"); printf("before async read 0\n");printf("£££££££££££££££££££££££££££\n"); + async_read(channel->mwmr_fifo, &msg, 1); + printf("£££££££££££££££££££££££££££\n");printf("after async read 0\n");printf("£££££££££££££££££££££££££££\n"); + return msg; + } + + msg = channel->pendingMessages; + previous = msg; + while(msg->next != NULL) { + previous = msg; + msg = msg->next; + } + + channel->currentNbOfMessages = channel->currentNbOfMessages -1; + previous->next = NULL; + //async_read(channel->mwmr_fifo, msg, 1); +printf("£££££££££££££££££££££££££££\n");printf("before async read 1");printf("£££££££££££££££££££££££££££\n"); + async_read(channel->mwmr_fifo, &msg, 1);//DG 08.12. +printf("£££££££££££££££££££££££££££\n");printf("after async read 1\n");printf("£££££££££££££££££££££££££££\n"); + return msg; +} + +void addMessageToAsyncChannel(asyncchannel *channel, message *msg) { + + /* DG on ajoute un champ a la structure channel */ + msg->next = channel->pendingMessages; + channel->pendingMessages = msg; + channel->currentNbOfMessages = channel->currentNbOfMessages+1; +printf("asyncchannel address %x \n",channel->mwmr_fifo); +printf("asyncchannel %x \n",channel->mwmr_fifo->depth); +printf("asyncchannel %x \n",channel->mwmr_fifo->width); + +printf("£££££££££££££££££££££££££££\n");printf("before async write 0");printf("£££££££££££££££££££££££££££\n"); +printf("channel->mwmr_fifo: %x \n",channel->mwmr_fifo); +async_write(channel->mwmr_fifo, &msg, 1 ); + printf("£££££££££££££££££££££££££££\n"); +printf("after async write 0\n");printf("£££££££££££££££££££££££££££\n"); +} diff --git a/executablecode/src_soclib/asyncchannel.h b/executablecode/src_soclib/asyncchannel.h new file mode 100755 index 0000000000..d7f6ae9bf2 --- /dev/null +++ b/executablecode/src_soclib/asyncchannel.h @@ -0,0 +1,29 @@ +#ifndef ASYNCCHANNEL_H +#define ASYNCCHANNEL_H + +struct asyncchannel; + +#include "message.h" +#include "request.h" + + +struct asyncchannel { + char *outname; + char *inname; + int isBlocking; // In writing. Reading is always blocking + int maxNbOfMessages; // + struct request* outWaitQueue; + struct request* inWaitQueue; + message *pendingMessages; + int currentNbOfMessages; + struct mwmr_s *mwmr_fifo; +}; + +typedef struct asyncchannel asyncchannel; +//DG 7.9. add MWMR as parameter +asyncchannel *getNewAsyncchannel(char *inname, char *outname, int isBlocking, int maxNbOfMessages, struct mwmr_s *fifo); +void destroyAsyncchannel(asyncchannel *syncch); +message* getAndRemoveOldestMessageFromAsyncChannel(asyncchannel *channel); +void addMessageToAsyncChannel(asyncchannel *channel, message *msg); + +#endif diff --git a/executablecode/src_soclib/debug.c b/executablecode/src_soclib/debug.c new file mode 100755 index 0000000000..a0d488cdd2 --- /dev/null +++ b/executablecode/src_soclib/debug.c @@ -0,0 +1,87 @@ +#include <stdlib.h> +#include <stdio.h> +#include <time.h> + +#include "debug.h" + + +#define DEBUG_ON 1 +#define DEBUG_OFF 2 + +int debug = DEBUG_OFF; + +void activeDebug() { + debug = DEBUG_ON; +} + +void unactiveDebug() { + debug = DEBUG_OFF; +} + +void debugThreeInts(char *msg, int value1, int value2, int value3) { + if (debug == DEBUG_OFF) { + return; + } + + if (msg != NULL) { + printf("DT> %s: %d, %d, %d\n", msg, value1, value2, value3); + } +} + +void debugTwoInts(char *msg, int value1, int value2) { + if (debug == DEBUG_OFF) { + return; + } + + if (msg != NULL) { + printf("DT> %s: %d, %d\n", msg, value1, value2); + } +} + +void debugInt(char *msg, int value) { + if (debug == DEBUG_OFF) { + return; + } + + if (msg != NULL) { + // printf("DT> %s: %d\n", msg, value); + printf("DT> %s: %x\n", msg, value); + } +} + +void debugLong(char *msg, long value) { + if (debug == DEBUG_OFF) { + return; + } + + if (msg != NULL) { + printf("DT> %s: %ld\n", msg, value); + } +} + +void debugMsg(char *msg) { + if (debug == DEBUG_OFF) { + return; + } + + if (msg != NULL) { + printf("DT> %s\n", msg); + } +} + +void debug2Msg(char *name, char *msg) { + if (debug == DEBUG_OFF) { + return; + } + + if ((name != NULL) && (msg != NULL)) { + printf("DT - %s -> %s\n", name, msg); + } +} + +void debugTime(char *msg, struct timespec *ts) { + if (debug == DEBUG_OFF) { + return; + } + printf("DT> (-------t------->) %s sec=%ld nsec=%ld\n", msg, ts->tv_sec, ts->tv_nsec); +} diff --git a/executablecode/src_soclib/debug.h b/executablecode/src_soclib/debug.h new file mode 100755 index 0000000000..7e73f3ce91 --- /dev/null +++ b/executablecode/src_soclib/debug.h @@ -0,0 +1,19 @@ + + +#ifndef DEBUG_H +#define DEBUG_H + +void activeDebug(); +void unactiveDebug(); + +void debugThreeInts(char *msg, int value1, int value2, int value3); +void debugTwoInts(char *msg, int value1, int value2); +void debugLong(char *msg, long value); +void debugInt(char *msg, int value); +void debugMsg(char *msg); +void debug2Msg(char *name, char* msg); +void debugTime(char* msg, struct timespec *ts); + +#endif + + diff --git a/executablecode/src_soclib/defs.h b/executablecode/src_soclib/defs.h new file mode 100755 index 0000000000..3b997bfdf8 --- /dev/null +++ b/executablecode/src_soclib/defs.h @@ -0,0 +1,9 @@ +#ifndef DEFS_H +#define DEFS_H + +#define bool int +#define true 1 +#define false 0 + + +#endif diff --git a/executablecode/src_soclib/message.c b/executablecode/src_soclib/message.c new file mode 100755 index 0000000000..8f2a49ae3e --- /dev/null +++ b/executablecode/src_soclib/message.c @@ -0,0 +1,69 @@ + +#include <stdlib.h> +#include <unistd.h> +#include <pthread.h> + +#include "message.h" +#include "myerrors.h" + +long __id_message = 0; +pthread_mutex_t __message_mutex; + + +void initMessages() { + if (pthread_mutex_init(&__message_mutex, NULL) < 0) { exit(-1);} +} + +long getMessageID() { + + long tmp; + printf("getMessageID adress mutex: %x\n",&__message_mutex); + //pthread_mutex_lock(&__message_mutex); + + tmp = __id_message; printf("****message ID %d \n",__id_message); + __id_message ++; + //pthread_mutex_unlock(&__message_mutex); + + return tmp; +} + +message *getNewMessageWithParams(int nbOfParams) { + debugMsg("getNewMessageWithParams"); + message *msg = (message *)(malloc(sizeof(struct message))); + if (msg == NULL) { + criticalError("Allocation of request failed"); + } + msg->nbOfParams = nbOfParams; + + msg->params = (int *)(malloc(sizeof(int) * nbOfParams)); + + //debugMsg("mutex: %x",&__message_mutex); + msg->id = getMessageID(); + + printf("msg->id: %d\n",msg->id); + + return msg; +} + +message *getNewMessage(int nbOfParams, int *params) { + + message *msg = (message *)(malloc(sizeof(struct message))); + if (msg == NULL) { + criticalError("Allocation of request failed"); + } + msg->nbOfParams = nbOfParams; + msg->params = params; + msg->id = getMessageID(); + return msg; +} + + + +void destroyMessageWithParams(message *msg) { + free(msg->params); + free(msg); +} + +void destroyMessage(message *msg) { + free(msg); +} diff --git a/executablecode/src_soclib/message.h b/executablecode/src_soclib/message.h new file mode 100755 index 0000000000..700ed07d97 --- /dev/null +++ b/executablecode/src_soclib/message.h @@ -0,0 +1,22 @@ +#ifndef MESSAGE_H +#define MESSAGE_H + + +struct message { + struct message *next; + int nbOfParams; + int *params; + long id; +}; + +typedef struct message message; + +void initMessages(); +message *getNewMessageWithParams(int nbOfParams); +message *getNewMessage(int nbOfParams, int *params); +void destroyMessageWithParams(message *msg); +void destroyMessage(message *msg); + + + +#endif diff --git a/executablecode/src_soclib/myerrors.c b/executablecode/src_soclib/myerrors.c new file mode 100755 index 0000000000..dfe16a1c46 --- /dev/null +++ b/executablecode/src_soclib/myerrors.c @@ -0,0 +1,24 @@ +#include <stdlib.h> +#include <stdio.h> + +#include "myerrors.h" + + + + +void criticalErrorInt(char *msg, int value) { + if (msg != NULL) { + printf("\nCritical error: %s, %d\n", msg, value); + } + + exit(-1); +} + + +void criticalError(char *msg) { + if (msg != NULL) { + printf("\nCritical error: %s\n", msg); + } + + exit(-1); +} diff --git a/executablecode/src_soclib/myerrors.h b/executablecode/src_soclib/myerrors.h new file mode 100755 index 0000000000..8300da3ccf --- /dev/null +++ b/executablecode/src_soclib/myerrors.h @@ -0,0 +1,11 @@ + + +#ifndef MY_ERRORS_H +#define MY_ERRORS_H + +void criticalErrorInt(char *msg, int value); +void criticalError(char *msg); + +#endif + + diff --git a/executablecode/src_soclib/mytimelib.c b/executablecode/src_soclib/mytimelib.c new file mode 100755 index 0000000000..d39c07ea85 --- /dev/null +++ b/executablecode/src_soclib/mytimelib.c @@ -0,0 +1,108 @@ +#include<time.h> + +#include "mytimelib.h" +#include "random.h" +#include "debug.h" + +#ifndef CLOCK_REALTIME +#define CLOCK_REALTIME + +int clock_gettime(struct timespec *ts) { + struct timeval tv; + gettimeofday(&tv, NULL); + ts->tv_sec = tv.tv_sec; + ts->tv_nsec = tv.tv_usec * 1000; + return 0; +} + +int my_clock_gettime(struct timespec *tp) { + return clock_gettime(tp); +} + +#else + +int my_clock_gettime(struct timespec *tp) { + return clock_gettime(CLOCK_REALTIME, tp); +} + +#endif + + + +void addTime(struct timespec *src1, struct timespec *src2, struct timespec *dest) { + dest->tv_nsec = src1->tv_nsec + src2->tv_nsec; + dest->tv_sec = src1->tv_sec + src2->tv_sec; + if (dest->tv_nsec > 1000000000) { + dest->tv_sec = dest->tv_sec + (dest->tv_nsec / 1000000000); + dest->tv_nsec = dest->tv_nsec % 1000000000; + } +} + +void diffTime(struct timespec *src1, struct timespec *src2, struct timespec *dest) { + int diff = 0; + if (src1->tv_nsec > src2->tv_nsec) { + diff ++; + dest->tv_nsec = src2->tv_nsec - src1->tv_nsec + 1000000000; + } else { + dest->tv_nsec = src2->tv_nsec - src1->tv_nsec; + } + + dest->tv_sec = src2->tv_sec - src1->tv_sec - diff; +} + + + +int isBefore(struct timespec *src1, struct timespec *src2) { + if (src1->tv_sec > src2->tv_sec) { + return 0; + } + + if (src1->tv_sec < src2->tv_sec) { + return 1; + } + + if (src1->tv_nsec < src2->tv_nsec) { + return 1; + } + return 0; +} + +void minTime(struct timespec *src1, struct timespec *src2, struct timespec *dest) { + debugMsg("MIN TIME COMPUTATION"); + if (isBefore(src1,src2)) { + dest->tv_nsec = src1->tv_nsec; + dest->tv_sec = src1->tv_sec; + } else { + dest->tv_nsec = src2->tv_nsec; + dest->tv_sec = src2->tv_sec; + } + +} + + +void delayToTimeSpec(struct timespec *ts, long delay) { + ts->tv_nsec = (delay % 1000000)*1000; + ts->tv_sec = (delay / 1000000); +} + +void waitFor(long minDelay, long maxDelay) { + struct timespec tssrc; + struct timespec tsret; + int delay; + + + + debugMsg("Computing random delay"); + //debugLong("Min delay", minDelay); + //debugLong("Max delay", maxDelay); + delay = computeLongRandom(minDelay, maxDelay); + + debugLong("Random delay=", delay); + + delayToTimeSpec(&tssrc, delay); + + debugLong("............. waiting For", delay); + nanosleep(&tssrc, &tsret); + debugLong("............. waiting Done for: ", delay); +} + diff --git a/executablecode/src_soclib/mytimelib.h b/executablecode/src_soclib/mytimelib.h new file mode 100755 index 0000000000..76ed5d22fc --- /dev/null +++ b/executablecode/src_soclib/mytimelib.h @@ -0,0 +1,19 @@ +#ifndef MYTIMELIB_H +#define MYTIMELIB_H + +#include <time.h> +#include <sys/time.h> + + + +// in usec + +int my_clock_gettime(struct timespec *tp); +void addTime(struct timespec *src1, struct timespec *src2, struct timespec *dest); +void diffTime(struct timespec *src1, struct timespec *src2, struct timespec *dest); +int isBefore(struct timespec *src1, struct timespec *src2); +void minTime(struct timespec *src1, struct timespec *src2, struct timespec *dest); +void delayToTimeSpec(struct timespec *ts, long delay); +extern void waitFor(long minDelay, long maxDelay); + +#endif diff --git a/executablecode/src_soclib/random.c b/executablecode/src_soclib/random.c new file mode 100755 index 0000000000..b4bbddb19a --- /dev/null +++ b/executablecode/src_soclib/random.c @@ -0,0 +1,43 @@ + +#include <stdlib.h> +#include <unistd.h> +#include <time.h> +#include <limits.h> + +#include "random.h" +#include "debug.h" +#include <math.h> + +#include "mytimelib.h" + +int computeRandom(int min, int max) { + if (min == max) { + return min; + } + return (rand() % (max - min)) + min; +} + +long computeLongRandom(long min, long max) { + + if (min == max) { + return min; + } + + + long rand0 = (long)rand(); + long rand1 = rand0 % (max - min); + //debugLong("min=", min); + //debugLong("max=", max); + //debugLong("rand0", rand0); + //debugLong("rand1", rand1); + //debugLong("Random long", rand1 + min); + return rand1 + min; +} + +void initRandom() { + struct timespec ts; + + my_clock_gettime(&ts); + + srand((int)(ts.tv_nsec)); +} diff --git a/executablecode/src_soclib/random.h b/executablecode/src_soclib/random.h new file mode 100755 index 0000000000..ecff7cb43c --- /dev/null +++ b/executablecode/src_soclib/random.h @@ -0,0 +1,7 @@ +#ifndef RANDOM_H +#define RANDOM_H + +extern void initRandom(); +extern int computeRandom(int min, int max); +extern long computeLongRandom(long min, long max); +#endif diff --git a/executablecode/src_soclib/request.c b/executablecode/src_soclib/request.c new file mode 100755 index 0000000000..2cb2339a5b --- /dev/null +++ b/executablecode/src_soclib/request.c @@ -0,0 +1,272 @@ + +#include <stdlib.h> +#include <unistd.h> + +#include "request.h" +#include "mytimelib.h" +#include "myerrors.h" +#include "random.h" +#include "debug.h" + + +request *getNewRequest(int ID, int type, int hasDelay, long minDelay, long maxDelay, int nbOfParams, int **params) { + request *req = (request *)(malloc(sizeof(struct request))); + + if (req == NULL) { + criticalError("Allocation of request failed"); + } + + makeNewRequest(req, ID, type, hasDelay, minDelay, maxDelay, nbOfParams, params); + return req; +} + + +// Delays are in microseconds +void makeNewRequest(request *req, int ID, int type, int hasDelay, long minDelay, long maxDelay, int nbOfParams, int **params) { + long delay; + int i; + //debugMsg("makeNewReq"); + req->next = NULL; + req->listOfRequests = NULL; + req->nextRequestInList = NULL; + + req->type = type; + req->ID = ID; + req->hasDelay = hasDelay; + + if (req->hasDelay > 0) { + delay = computeLongRandom(minDelay, maxDelay); + delayToTimeSpec(&(req->delay), delay); + } + + req->selected = 0; + req->nbOfParams = nbOfParams; + req->params = params; + + req->alreadyPending = 0; + req->delayElapsed = 0; + + req->relatedRequest = NULL; + + if (type == SEND_ASYNC_REQUEST) { + // Must create a new message + req->msg = getNewMessageWithParams(nbOfParams); + + for(i=0; i<nbOfParams; i++) { + req->msg->params[i] = *(params[i]); + } + } + +} + + + + +void destroyRequest(request *req) { + free((void *)req); +} + +int isRequestSelected(request *req) { + return req->selected; +} + +int nbOfRequests(setOfRequests *list) { + int cpt = 0; + request *req; + + req = list->head; + + while(req != NULL) { + cpt ++; + req = req->nextRequestInList; + } + + return cpt; +} + +request *getRequestAtIndex(setOfRequests *list, int index) { + int cpt = 0; + request * req = list->head; + + while(cpt < index) { + req = req->nextRequestInList; + cpt ++; + } + + return req; + +} + + +request * addToRequestQueue(request *list, request *requestToAdd) { + request *origin = list; + + if (list == NULL) { + return requestToAdd; + } + + while(list->next != NULL) { + list = list->next; + } + + list->next = requestToAdd; + + requestToAdd->next = NULL; + + return origin; +} + +request * removeRequestFromList(request *list, request *requestToRemove) { + request *origin = list; + + if (list == requestToRemove) { + return list->next; + } + + + while(list->next != requestToRemove) { + list = list->next; + } + + list->next = requestToRemove->next; + + return origin; +} + + +void copyParameters(request *src, request *dst) { + int i; + for(i=0; i<dst->nbOfParams; i++) { + *(dst->params[i]) = *(src->params[i]); + } +} + + +void clearListOfRequests(setOfRequests *list) { + list->head = NULL; +} + +setOfRequests *newListOfRequests(pthread_cond_t *wakeupCondition, pthread_mutex_t *mutex) { + setOfRequests *list = (setOfRequests *)(malloc(sizeof(setOfRequests))); + list->head = NULL; + list->wakeupCondition = wakeupCondition; + list->mutex = mutex; + + return list; +} + +void fillListOfRequests(setOfRequests *list, char *name, pthread_cond_t *wakeupCondition, pthread_mutex_t *mutex) { + list->head = NULL; + list->owner = name; + list->wakeupCondition = wakeupCondition; + list->mutex = mutex; +} + + +void addRequestToList(setOfRequests *list, request* req) { + request *tmpreq; + + if (list == NULL) { + criticalError("NULL List in addRequestToList"); + } + + if (req == NULL) { + criticalError("NULL req in addRequestToList"); + } + + req->listOfRequests = list; + + if (list->head == NULL) { + list->head = req; + req->nextRequestInList = NULL; + return; + } + + tmpreq = list->head; + while(tmpreq->nextRequestInList != NULL) { + tmpreq = tmpreq->nextRequestInList; + } + + tmpreq->nextRequestInList = req; + req->nextRequestInList = NULL; +} + +void removeAllPendingRequestsFromPendingLists(request *req, int apartThisOne) { + setOfRequests *list = req->listOfRequests; + request *reqtmp; + + if (list == NULL) { + return; + } + + reqtmp = list->head; + + while(reqtmp != NULL) { + debugInt("Considering request of type", reqtmp->type); + if (reqtmp->alreadyPending) { + if (reqtmp->type == RECEIVE_SYNC_REQUEST) { + debugMsg("Removing send sync request from inWaitQueue"); + reqtmp->syncChannel->inWaitQueue = removeRequestFromList(reqtmp->syncChannel->inWaitQueue, reqtmp); + debugMsg("done"); + } + + if (reqtmp->type == SEND_SYNC_REQUEST) { + debugMsg("Removing receive sync request from outWaitQueue"); + reqtmp->syncChannel->outWaitQueue = removeRequestFromList(reqtmp->syncChannel->outWaitQueue, reqtmp); + debugMsg("done"); + } + + if (reqtmp->type == RECEIVE_BROADCAST_REQUEST) { + debugMsg("Removing broadcast receive request from inWaitQueue"); + reqtmp->syncChannel->inWaitQueue = removeRequestFromList(reqtmp->syncChannel->inWaitQueue, reqtmp); + debugMsg("done"); + } + } + reqtmp = reqtmp->nextRequestInList; + } +} + + +// Identical means belonging to the same ListOfRequest +// Returns the identical request if found, otherwise, null +request *hasIdenticalRequestInListOfSelectedRequests(request *req, request *list) { + + while(list != NULL) { + if (list->listOfRequests == req->listOfRequests) { + return list; + } + list = list->relatedRequest; + } + + return NULL; +} + +request* replaceInListOfSelectedRequests(request *oldRequest, request *newRequest, request *list) { + request *head = list; + + if (list == oldRequest) { + newRequest->relatedRequest = oldRequest->relatedRequest; + return newRequest; + } + + //list=list->relatedRequest; + while(list->relatedRequest != oldRequest) { + list = list->relatedRequest; + } + + list->relatedRequest = newRequest; + newRequest->relatedRequest = oldRequest->relatedRequest; + + return head; +} + + +int nbOfRelatedRequests(request *list) { + int cpt = 0; + while(list->relatedRequest != NULL) { + cpt ++; + list = list->relatedRequest; + } + + return cpt; +} diff --git a/executablecode/src_soclib/request.h b/executablecode/src_soclib/request.h new file mode 100755 index 0000000000..01144ccc7b --- /dev/null +++ b/executablecode/src_soclib/request.h @@ -0,0 +1,89 @@ +#ifndef REQUEST_H +#define REQUEST_H + +#include <time.h> +#include <pthread.h> + +struct request; + +#include "syncchannel.h" +#include "asyncchannel.h" +#include "message.h" + +#define SEND_SYNC_REQUEST 0 +#define RECEIVE_SYNC_REQUEST 2 +#define SEND_ASYNC_REQUEST 4 +#define RECEIVE_ASYNC_REQUEST 6 +#define DELAY 8 +#define IMMEDIATE 10 +#define SEND_BROADCAST_REQUEST 12 +#define RECEIVE_BROADCAST_REQUEST 14 + +typedef struct timespec timespec; + +struct setOfRequests { + char* owner; + struct request *head; + timespec startTime; + timespec completionTime; + pthread_cond_t *wakeupCondition; + pthread_mutex_t *mutex; + + int hasATimeRequest; // Means that at least one request of the list hasn't completed yet its time delay + timespec minTimeToWait; + struct request *selectedRequest; +}; + +typedef struct setOfRequests setOfRequests; + +struct request { + struct request *next; + struct setOfRequests* listOfRequests; + struct request* nextRequestInList; + struct request* relatedRequest; // For synchro and broadcast + struct syncchannel *syncChannel; + struct asyncchannel *asyncChannel; + + int type; + int ID; + int hasDelay;; + timespec delay; + int nbOfParams; // synchronous com. + int **params; // synchronous com. + message *msg; // Asynchronous comm. + + + // Filled by the request manager + int executable; + int selected; + int alreadyPending; // Whether it has been taken into account for execution or not + int delayElapsed; + timespec myStartTime; // Time at which the delay has expired +}; + +typedef struct request request; + +void makeNewRequest(request *req, int ID, int type, int hasDelay, long minDelay, long maxDelay, int nbOfParams, int **params); +request *getNewRequest(int ID, int type, int hasDelay, long minDelay, long maxDelay, int nbOfParams, int **params); +void destroyRequest(request *req); +extern int isRequestSelected(request *req); + +int nbOfRequests(setOfRequests *list); +request *getRequestAtIndex(setOfRequests *list, int index); + +request * addToRequestQueue(request *list, request *requestToAdd); +request * removeRequestFromList(request *list, request *requestToRemove); + +void copyParameters(request *src, request *dst); + +setOfRequests *newListOfRequests(pthread_cond_t *wakeupCondition, pthread_mutex_t *mutex); +void addRequestToList(setOfRequests *list, request* req); +void clearListOfRequests(setOfRequests *list); +void fillListOfRequests(setOfRequests *list, char *name, pthread_cond_t *wakeupCondition, pthread_mutex_t *mutex); + +void removeAllPendingRequestsFromPendingLists(request *req, int apartThisOne); +request *hasIdenticalRequestInListOfSelectedRequests(request *req, request *list); +request* replaceInListOfSelectedRequests(request *oldRequest, request *newRequest, request *list); +int nbOfRelatedRequests(request *list); + +#endif diff --git a/executablecode/src_soclib/request_manager.c b/executablecode/src_soclib/request_manager.c new file mode 100755 index 0000000000..abc674dd0e --- /dev/null +++ b/executablecode/src_soclib/request_manager.c @@ -0,0 +1,583 @@ +#include <stdlib.h> +#include <pthread.h> +#include <time.h> + +#include "request_manager.h" +#include "request.h" +#include "myerrors.h" +#include "debug.h" +#include "mytimelib.h" +#include "random.h" +#include "asyncchannel.h" +#include "syncchannel.h" //ajoute DG +#include "tracemanager.h" + + +//DG 6.7. ici, il faut qu'on transmet egalement par les canaux MWMR; pour les syncchannels, c'etait totalement implicite et fait par le request manager tout seul!! j'ai ajoute des fonctions addMessageToSyncChannel(req->syncChannel, msg); et getAndRemoveMessageFromSyncChannel au fichier syncchannel.c + +void executeSendSyncTransaction(request *req) { + int cpt; + request *selectedReq; + + // At least one transaction available -> must select one randomly + // First: count how many of them are available + // Then, select one + // Broadcast the new condition! + + cpt = 0; + request* currentReq = req->syncChannel->inWaitQueue; + debugMsg("Execute send sync tr"); + + //un syncchannel n'a qu'un seul message; il manque encore le test si on peut ecrire + + while(currentReq != NULL) { + cpt ++; + currentReq = currentReq->next; + } + + cpt = random() % cpt; + + // Head of the list? + selectedReq = req->syncChannel->inWaitQueue; + while (cpt > 0) { + selectedReq = selectedReq->next; + cpt --; + } + + // Remove all related request from list requests + //req->syncChannel->inWaitQueue = removeRequestFromList(req->syncChannel->inWaitQueue, selectedReq);//enleve 21.09. + debugMsg("Setting related request"); + req->relatedRequest = selectedReq; + + // Select the selected request, and notify the information + selectedReq->selected = 1; + selectedReq->listOfRequests->selectedRequest = selectedReq; + + // Handle parameters + copyParameters(req, selectedReq); + + //ajoute DG 7.7. choix randomise du writer a deja ete fait!! + //sync_write(selectedReq->syncChannel->mwmr_fifo, selectedReq->ID);//pas clair quell donne est a transmettre, choisi ID + // fin ajoute + + debugMsg("Signaling"); + //DG 21.09. faut changer? + pthread_cond_signal(selectedReq->listOfRequests->wakeupCondition); + + traceSynchroRequest(req, selectedReq); +} + +void executeReceiveSyncTransaction(request *req) { + int cpt; + request *selectedReq; + + // At least one transaction available -> must select one randomly + // First: count how many of them are available + // Then, select one + // Broadcast the new condition! DG ce n'est pas un broadcast sur canal + + request* currentReq = req->syncChannel->outWaitQueue; + cpt = 0; + debugMsg("Execute receive sync tr"); + + while(currentReq != NULL) { + cpt ++; + //debugInt("cpt", cpt); + currentReq = currentReq->next; + } + cpt = random() % cpt; + selectedReq = req->syncChannel->outWaitQueue; + while (cpt > 0) { + selectedReq = selectedReq->next; + cpt --; + } + + //req->syncChannel->outWaitQueue = removeRequestFromList(req->syncChannel->outWaitQueue, selectedReq); + debugMsg("Setting related request"); + req->relatedRequest = selectedReq; + + // Select the request, and notify the information in the channel + selectedReq->selected = 1; + selectedReq->listOfRequests->selectedRequest = selectedReq; + + // Handle parameters + copyParameters(selectedReq, req); + + debugMsg("Signaling"); + pthread_cond_signal(selectedReq->listOfRequests->wakeupCondition); + + //ajoute DG + //sync_read(selectedReq->syncChannel->mwmr_fifo, selectedReq->ID); //pas clair quell donne est a transmettre, choisi ID + //fin ajoute + traceSynchroRequest(selectedReq, req); +} + + +void executeSendAsyncTransaction(request *req) { + request *selectedReq; + + // Full FIFO? + if (req->asyncChannel->currentNbOfMessages == req->asyncChannel->maxNbOfMessages) { + // Must remove the oldest message + getAndRemoveOldestMessageFromAsyncChannel(req->asyncChannel); + } + + addMessageToAsyncChannel(req->asyncChannel, req->msg); + + debugMsg("Signaling async write to all requests waiting "); + selectedReq = req->asyncChannel->inWaitQueue; + while (selectedReq != NULL) { + pthread_cond_signal(selectedReq->listOfRequests->wakeupCondition); + selectedReq = selectedReq->next; + } + debugMsg("Signaling done"); + + traceAsynchronousSendRequest(req); +} + +void executeReceiveAsyncTransaction(request *req) { + int i; + request *selectedReq; + + req->msg = getAndRemoveOldestMessageFromAsyncChannel(req->asyncChannel); + + selectedReq = req->asyncChannel->outWaitQueue; + + // Must recopy parameters + for(i=0; i<req->nbOfParams; i++) { + *(req->params[i]) = req->msg->params[i]; + } + + traceAsynchronousReceiveRequest(req); + + // unallocate message + destroyMessageWithParams(req->msg); + + debugMsg("Signaling async read to all requests waiting "); + while (selectedReq != NULL) { + pthread_cond_signal(selectedReq->listOfRequests->wakeupCondition); + selectedReq = selectedReq->next; + } + debugMsg("Signaling done"); +} + + +void executeSendBroadcastTransaction(request *req) { + int cpt; + request *tmpreq; + + // At least one transaction available -> must select all of them + // but at most one per task + // Then, broadcast the new condition! + + request* currentReq = req->syncChannel->inWaitQueue; + request* currentLastReq = req; + debugMsg("Execute broadcast sync tr"); + + + while(currentReq != NULL) { + tmpreq = hasIdenticalRequestInListOfSelectedRequests(currentReq, req->relatedRequest); + if (tmpreq != NULL) { + // Must select one of the two + // If =1, replace, otherwise, just do nothing + cpt = random() % 2; + if (cpt == 1) { + debugMsg("Replacing broadcast request"); + req->relatedRequest = replaceInListOfSelectedRequests(tmpreq, currentReq, req->relatedRequest); + currentReq->listOfRequests->selectedRequest = currentReq; + copyParameters(req, currentReq); + currentReq->selected = 1; + currentLastReq = req; + while(currentLastReq->relatedRequest != NULL) { + currentLastReq = currentLastReq->relatedRequest; + } + } + } else { + currentLastReq->relatedRequest = currentReq; + currentReq->relatedRequest = NULL; + currentReq->selected = 1; + currentReq->listOfRequests->selectedRequest = currentReq; + copyParameters(req, currentReq); + currentLastReq = currentReq; + } + + currentReq = currentReq->next; + + debugInt("Nb of requests selected:", nbOfRelatedRequests(req)); + } + + + debugMsg("Signaling"); + currentReq = req->relatedRequest; + cpt = 0; + while(currentReq != NULL) { + cpt ++; + pthread_cond_signal(currentReq->listOfRequests->wakeupCondition); + traceSynchroRequest(req, currentReq); + currentReq = currentReq->relatedRequest; + } + + debugInt("NUMBER of broadcast Requests", cpt); +} + + +int executable(setOfRequests *list, int nb) { + int cpt = 0; + //int index = 0; + request *req = list->head; + timespec ts; + int tsDone = 0; + + debugMsg("Starting loop"); + + list->hasATimeRequest = 0; + + while(req != NULL) { + if (!(req->delayElapsed)) { + if (req->hasDelay) { + // Is the delay elapsed??? + debugTime("begin time of list of request", &list->startTime); + debugTime("start time of this request", &req->myStartTime); + if (tsDone == 0) { + my_clock_gettime(&ts); + debugTime("Current time", &ts); + tsDone = 1; + } + + if (isBefore(&ts, &(req->myStartTime)) == 1) { + // Delay not elapsed + debugMsg("---------t--------> delay NOT elapsed"); + if (list->hasATimeRequest == 0) { + list->hasATimeRequest = 1; + list->minTimeToWait.tv_nsec = req->myStartTime.tv_nsec; + list->minTimeToWait.tv_sec = req->myStartTime.tv_sec; + } else { + minTime(&(req->myStartTime), &(list->minTimeToWait),&(list->minTimeToWait)); + } + } else { + // Delay elapsed + debugMsg("---------t--------> delay elapsed"); + req->delayElapsed = 1; + } + } else { + req->delayElapsed = 1; + } + } + req = req->nextRequestInList; + } + + req = list->head; + while((req != NULL) && (cpt < nb)) { + req->executable = 0; + if (req->delayElapsed) { + if (req->type == SEND_SYNC_REQUEST) { + debugMsg("Send sync"); + + if (req->syncChannel->inWaitQueue != NULL) { + debugMsg("Send sync executable"); + req->executable = 1; + cpt ++; + } else { + debugMsg("Send sync not executable"); + } + //index ++; + } + + if (req->type == RECEIVE_SYNC_REQUEST) { + debugMsg("receive sync"); + if (req->syncChannel->outWaitQueue != NULL) { + req->executable = 1; + cpt ++; + } + //index ++; + } + + if (req->type == SEND_ASYNC_REQUEST) { + debugMsg("Send async"); + + if (!(req->asyncChannel->isBlocking)) { + // Can always add a message -> executable + debugMsg("Send async executable since non blocking"); + req->executable = 1; + cpt ++; + + //blocking case ... channel full? + } else { + if (req->asyncChannel->currentNbOfMessages < req->asyncChannel->maxNbOfMessages) { + // Not full! + debugMsg("Send async executable since channel not full"); + req->executable = 1; + cpt ++; + } else { + debugMsg("Send async not executable: full, and channel is blocking"); + } + } + } + + if (req->type == RECEIVE_ASYNC_REQUEST) { + debugMsg("receive async"); + if (req->asyncChannel->currentNbOfMessages >0) { + debugMsg("Receive async executable: not empty"); + req->executable = 1; + cpt ++; + } else { + debugMsg("Receive async not executable: empty"); + } + //index ++; + } + + + if (req->type == SEND_BROADCAST_REQUEST) { + debugMsg("send broadcast"); + req->executable = 1; + cpt ++; + } + + if (req->type == RECEIVE_BROADCAST_REQUEST) { + debugMsg("receive broadcast"); + // A receive broadcast is never executable + req->executable = 0; + //index ++; + } + + + + + if (req->type == IMMEDIATE) { + debugMsg("immediate"); + req->executable = 1; + cpt ++; + } + } + + req = req->nextRequestInList; + + } + + return cpt; +} + +void private__makeRequestPending(setOfRequests *list) { + request *req = list->head; + while(req != NULL) { + if ((req->delayElapsed) && (!(req->alreadyPending))) { + if (req->type == SEND_SYNC_REQUEST) { + debugMsg("Adding pending request in outWaitqueue"); + req->syncChannel->outWaitQueue = addToRequestQueue(req->syncChannel->outWaitQueue, req); + req->alreadyPending = 1; + } + + if (req->type == RECEIVE_SYNC_REQUEST) { + debugMsg("Adding pending request in inWaitqueue"); + req->alreadyPending = 1; + req->syncChannel->inWaitQueue = addToRequestQueue(req->syncChannel->inWaitQueue, req); + } + + if (req->type == SEND_ASYNC_REQUEST) { + debugMsg("Adding pending request in outWaitqueue"); + req->asyncChannel->outWaitQueue = addToRequestQueue(req->asyncChannel->outWaitQueue, req); + req->alreadyPending = 1; + } + + if (req->type == RECEIVE_ASYNC_REQUEST) { + debugMsg("Adding pending request in inWaitqueue"); + req->alreadyPending = 1; + req->asyncChannel->inWaitQueue = addToRequestQueue(req->asyncChannel->inWaitQueue, req); + } + + if (req->type == RECEIVE_BROADCAST_REQUEST) { + debugMsg("Adding pending broadcast request in inWaitqueue"); + req->alreadyPending = 1; + req->syncChannel->inWaitQueue = addToRequestQueue(req->syncChannel->inWaitQueue, req); + } + + if (req->type == SEND_BROADCAST_REQUEST) { + debugMsg("Adding pending broadcast request in outWaitqueue"); + req->alreadyPending = 1; + req->syncChannel->outWaitQueue = addToRequestQueue(req->syncChannel->outWaitQueue, req); + } + + } + + req = req->nextRequestInList; + } +} + +void private__makeRequest(request *req) { + if (req->type == SEND_SYNC_REQUEST) { + executeSendSyncTransaction(req); + } + + if (req->type == RECEIVE_SYNC_REQUEST) { + executeReceiveSyncTransaction(req); + } + + if (req->type == SEND_ASYNC_REQUEST) { + executeSendAsyncTransaction(req); + } + + if (req->type == RECEIVE_ASYNC_REQUEST) { + executeReceiveAsyncTransaction(req); + } + + if (req->type == SEND_BROADCAST_REQUEST) { + executeSendBroadcastTransaction(req); + } + + // IMMEDIATE: Nothing to do + + // In all cases: remove other requests of the same list from their pending form + debugMsg("Removing original req"); + removeAllPendingRequestsFromPendingLists(req, 1); + removeAllPendingRequestsFromPendingListsRelatedRequests(req); + /*if (req->relatedRequest != NULL) { + debugMsg("Removing related req"); + removeAllPendingRequestsFromPendingLists(req->relatedRequest, 0); + }*/ + +} + +void removeAllPendingRequestsFromPendingListsRelatedRequests(request *req) { + if (req->relatedRequest != NULL) { + debugMsg("Removing related req"); + removeAllPendingRequestsFromPendingLists(req->relatedRequest, 0); + // Recursive call + removeAllPendingRequestsFromPendingListsRelatedRequests(req->relatedRequest); + } +} + + +request *private__executeRequests0(setOfRequests *list, int nb) { + int howMany, found; + int selectedIndex, realIndex; + request *selectedReq; + request *req; + + // Compute which requests can be executed + debugMsg("Counting requests"); + howMany = executable(list, nb); + + debugInt("Counting requests=", howMany); + + if (howMany == 0) { + debugMsg("No pending requests"); + // Must make them pending + + private__makeRequestPending(list); + + return NULL; + } + + debugInt("At least one pending request is executable", howMany); + + + // Select a request + req = list->head; + selectedIndex = (rand() % howMany)+1; + debugInt("selectedIndex=", selectedIndex); + realIndex = 0; + found = 0; + while(req != NULL) { + if (req->executable == 1) { + found ++; + if (found == selectedIndex) { + break; + } + } + realIndex ++; + req = req->nextRequestInList; + } + + debugInt("Getting request at index", realIndex); + selectedReq = getRequestAtIndex(list, realIndex); + selectedReq->selected = 1; + selectedReq->listOfRequests->selectedRequest = selectedReq; + + debugInt("Selected request of type", selectedReq->type); + + // Execute that request + private__makeRequest(selectedReq); + + return selectedReq; +} + + +request *private__executeRequests(setOfRequests *list) { + // Is a request already selected? + + if (list->selectedRequest != NULL) { + return list->selectedRequest; + } + + debugMsg("No request selected -> looking for one!"); + + return private__executeRequests0(list, nbOfRequests(list)); +} + + + + +request *executeOneRequest(setOfRequests *list, request *req) { + req->nextRequestInList = NULL; + req->listOfRequests = list; + list->head = req; + return executeListOfRequests(list); +} + + +void setLocalStartTime(setOfRequests *list) { + request *req = list->head; + + while(req != NULL) { + if (req->hasDelay) { + req->delayElapsed = 0; + addTime(&(list->startTime), &(req->delay), &(req->myStartTime)); + debug2Msg(list->owner, " -----t------>: Request with delay"); + } else { + req->delayElapsed = 1; + req->myStartTime.tv_nsec = list->startTime.tv_nsec; + req->myStartTime.tv_sec = list->startTime.tv_sec; + } + req = req->nextRequestInList; + } +} + + +// Return the executed request +request *executeListOfRequests(setOfRequests *list) { + request *req; + + my_clock_gettime(&list->startTime); + list->selectedRequest = NULL; + setLocalStartTime(list); + + // Try to find a request that could be executed + debug2Msg(list->owner, "Locking mutex"); + pthread_mutex_lock(list->mutex); + debug2Msg(list->owner, "Mutex locked"); + + debug2Msg(list->owner, "Going to execute request"); + + while((req = private__executeRequests(list)) == NULL) { + debug2Msg(list->owner, "Waiting for request!"); + if (list->hasATimeRequest == 1) { + debug2Msg(list->owner, "Waiting for a request and at most for a given time"); + debugTime("Min time to wait=", &(list->minTimeToWait)); + pthread_cond_timedwait(list->wakeupCondition, list->mutex, &(list->minTimeToWait)); + } else { + debug2Msg(list->owner, "Releasing mutex"); + //DG 21.09. enlever + //pthread_cond_wait(list->wakeupCondition, list->mutex); + } + debug2Msg(list->owner, "Waking up for requests! -> getting mutex"); + } + + debug2Msg(list->owner, "Request selected!"); + + my_clock_gettime(&list->completionTime); + + pthread_mutex_unlock(list->mutex); + debug2Msg(list->owner, "Mutex unlocked"); + return req; +} + diff --git a/executablecode/src_soclib/request_manager.h b/executablecode/src_soclib/request_manager.h new file mode 100755 index 0000000000..e2ae0f8000 --- /dev/null +++ b/executablecode/src_soclib/request_manager.h @@ -0,0 +1,14 @@ +#ifndef REQUEST_MANAGER_H +#define REQUEST_MANAGER_H + + +#include "request.h" +#include "syncchannel.h" + + +request *executeOneRequest(setOfRequests *list, request *req); +request *executeListOfRequests(setOfRequests *list); + +void removeAllPendingRequestsFromPendingListsRelatedRequests(request *req); + +#endif diff --git a/executablecode/src_soclib/syncchannel.c b/executablecode/src_soclib/syncchannel.c new file mode 100755 index 0000000000..a21d29afd1 --- /dev/null +++ b/executablecode/src_soclib/syncchannel.c @@ -0,0 +1,112 @@ +#include <stdlib.h> + + +#include "syncchannel.h" +#include "request.h" +#include "myerrors.h" +#include "debug.h" +#include "mwmr.h" +//#include "random.h" + +/* this function empties a channel and is called after one send or receive transaction + */ + +void mwmr_sync_flush(struct mwmr_s *fifo){ + int i=1; + while(i){ + i = mwmr_try_read(fifo,NULL,1); + } +} + +/* all synchronous communications use MWMR channels of size 1, enforcing synchronization */ + +void sync_read( struct mwmr_s *fifo, void *_ptr){ + int in; + while(1){ + /* loop until one single message has been read successfully */ + if(!(in=mwmr_try_read(fifo,_ptr,1))) continue; //le contenu de _ptr ne joue aucun role + } + return; +} + +/* in the case of multi_writer one channel per writer */ +/* we choose ramdomly one of the channels */ +/* the problem is to identify the channels on the side of the writers which can be in different blocks */ + +void sync_read_random( struct mwmr_s *fifo[], void *_ptr, int nb_writers){ + int in; + int rand = computeRandom(0, nb_writers-1); + while(1){ + /* loop until one single message has been read successfully */ + rand = computeRandom(0, nb_writers-1); + if(!(in=mwmr_try_read(fifo[rand],_ptr,1))) continue; + } + return; +} + +void sync_write(struct mwmr_s *fifo, void *_ptr){ + int out; + out=mwmr_try_write(fifo,NULL,1); + if(out==0){ + + printf("message lost\n"); + } + return; +} + +/* the task issueing the message does not continue until THIS PARTICULAR message has been successfully taken by another task; an additional empty sync message is issued for that purpose, in a busy waiting loop; once synchronization has been achieved, this message is flushed and a blocking write initiated */ + +/*void sync_write_random(struct mwmr_s *fifo[], void *_ptr, int id_writer){ + int out; + while(1){ + mwmr_lock(fifo[id_writer]); + out=mwmr_try_write(fifo[id_writer],NULL,1); + if(out==0){ + mwmr_unlock(fifo[id_writer]); + continue; + } + mwmr_sync_flush(fifo[id_writer]); + mwmr_write(fifo[id_writer],_ptr,1); + mwmr_unlock(fifo[id_writer]); + } + return; + }*/ + +//DG 7.9. add MWMR as parameter +//syncchannel *getNewSyncchannel(char *outname, char *inname, struct mwmr_s *fifo) { +syncchannel *getNewSyncchannel(char *outname, char *inname) { + syncchannel * syncch = (syncchannel *)(malloc(sizeof(struct syncchannel))); + if (syncch == NULL) { + criticalError("Allocation of request failed"); + } + syncch->inname = inname; + syncch->outname = outname; + syncch->inWaitQueue = NULL; + syncch->outWaitQueue = NULL; + syncch->isBroadcast = false; + //DG 7.9. add MWMR + //syncch->mwmr_fifo=fifo; + return syncch; +} + +void setBroadcast(syncchannel *syncch, bool b) { + syncch->isBroadcast = b; +} + +/*request *makeNewSendSync(int hasDelay, long delay, int nbOfParams, int *params[]) { + request *req = getNewRequest(SEND_SYNC_REQUEST, hasDelay, delay, nbOfParams, params); + return req; +} + +request *makeNewReceiveSync(int hasDelay, long delay, int nbOfParams, int *params[]) { + request *req = getNewRequest(RECEIVE_SYNC_REQUEST, hasDelay, delay, nbOfParams, params); + return req; + }*/ + +void destroySyncchannel(syncchannel *syncch) { + free(syncch); + + + + +} diff --git a/executablecode/src_soclib/syncchannel.h b/executablecode/src_soclib/syncchannel.h new file mode 100755 index 0000000000..b4ac61649e --- /dev/null +++ b/executablecode/src_soclib/syncchannel.h @@ -0,0 +1,30 @@ +#ifndef SYNCCHANNEL_H +#define SYNCCHANNEL_H + +struct syncchannel; + +#include "request.h" +#include "defs.h" + +struct syncchannel { + char *outname; + char *inname; + struct request* inWaitQueue; + struct request* outWaitQueue; + bool isBroadcast; + //struct mwmr_s *mwmr_fifo;//DG enleve 21.09. +}; + +typedef struct syncchannel syncchannel; + + +void setBroadcast(syncchannel *syncch, bool b); +//DG 7.9. add MWMR as parameter +//syncchannel *getNewSyncchannel(char *inname, char *outname, struct mwmr_s *fifo); +syncchannel *getNewSyncchannel(char *inname, char *outname); +//request *makeNewSendSync(int hasDelay, long delay, int nbOfParams, int *params[]); +//request *makeNewReceiveSync(int hasDelay, long delay, int nbOfParams, int *params[]); +void destroySyncchannel(syncchannel *syncch); + + +#endif diff --git a/executablecode/src_soclib/tracemanager.c b/executablecode/src_soclib/tracemanager.c new file mode 100755 index 0000000000..fda3a0db1c --- /dev/null +++ b/executablecode/src_soclib/tracemanager.c @@ -0,0 +1,304 @@ +#include <stdlib.h> +#include <stdio.h> +#include <time.h> + +#include "tracemanager.h" +#include "debug.h" +#include "mytimelib.h" + + +#define TRACE_OFF 0 +#define TRACE_IN_FILE 1 +#define TRACE_IN_CONSOLE 2 + +#define TRACE_FILE_NAME "Trace.txt" + + +pthread_mutex_t __traceMutex; + +int trace = TRACE_OFF; +int id = 0; + +FILE *file; + +struct timespec begints; + + +void addInfo(char *dest, char *info) { + //char s1[10]; + long tmp; + //long tmp1; + //int i; + struct timespec ts, ts1; + my_clock_gettime(&ts); + + debugMsg("DIFF TIME"); + diffTime(&begints, &ts, &ts1); + + tmp = ts1.tv_nsec; + + if (tmp < 0) { + tmp = -tmp; + } + + /*tmp1 = 100000000; + + for(i=0; i<9; i++) { + s1[i] = 48 + (tmp / tmp1); + tmp = tmp % tmp1; + tmp1 = tmp1 / 10; + } + s1[9] = '\0';*/ + + /* s1 -> tmp */ + sprintf(dest, "#%d time=%ld.%09ld %s", id, ts1.tv_sec, tmp, info); + id ++; +} + + +void writeInTrace(char *info) { + pthread_mutex_lock(&__traceMutex); + char s[CHAR_ALLOC_SIZE]; + addInfo(s, info); + //printf("Write in file\n"); + switch(trace){ + case TRACE_IN_FILE: + if (file != NULL) { + debug2Msg("Saving in file", s); + fprintf(file, s); + fflush(file); + } + break; + case TRACE_IN_CONSOLE: + printf("%s\n", s); + break; + } + + pthread_mutex_unlock(&__traceMutex); +} + + +void activeTracingInFile(char *fileName) { + char *name; + trace = TRACE_IN_FILE; + my_clock_gettime(&begints); + if (fileName == NULL) { + name = TRACE_FILE_NAME; + } else { + name = fileName; + } + file = fopen(name,"w"); + + /* Initializing mutex */ + if (pthread_mutex_init(&__traceMutex, NULL) < 0) { exit(-1);} +} + +void activeTracingInConsole() { + trace = TRACE_IN_CONSOLE; + my_clock_gettime(&begints); + + /* Initializing mutex */ + if (pthread_mutex_init(&__traceMutex, NULL) < 0) { exit(-1);} +} + +void unactiveTracing() { + trace = TRACE_OFF; +} + + +void traceStateEntering(char *myname, char *statename) { + char s[CHAR_ALLOC_SIZE]; + + debugMsg("Trace function"); + + if (trace == TRACE_OFF) { + return; + } + + sprintf(s, "block=%s type=state_entering state=%s\n", myname, statename); + + // Saving trace + writeInTrace(s); +} + +void traceFunctionCall(char *block, char *func, char *params) { + char s[CHAR_ALLOC_SIZE]; + + debugMsg("Trace function"); + + if (trace == TRACE_OFF) { + return; + } + + sprintf(s, "block=%s type=function_call func=%s parameters=%s\n", block, func, params); + + // Saving trace + writeInTrace(s); +} + + +// type=0: int type = 1:bool +void traceVariableModification(char *block, char *var, int value, int type) { + char s[CHAR_ALLOC_SIZE]; + debugMsg("Trace variable modification"); + + if (trace == TRACE_OFF) { + return; + } + + + if (type == 0) { + sprintf(s, "block=%s type=variable_modification variable=%s setTo=%d\n", block, var, value); + } + + if (type == 1) { + if (value == 0) { + sprintf(s, "block=%s type=variable_modification variable=%s setTo=false\n", block, var); + } else { + sprintf(s, "block=%s type=variable_modification variable=%s setTo=true\n", block, var); + } + } + + // Saving trace + writeInTrace(s); + +} + +void traceSynchroRequest(request *from, request *to) { + char s[1024]; + int i; + + if (trace == TRACE_OFF) { + return; + } + + sprintf(s, "block=%s blockdestination=%s type=synchro channel=%s params=", from->listOfRequests->owner, to->listOfRequests->owner, from->syncChannel->outname); + for(i=0; i<from->nbOfParams; i++) { + if (i>0) { + sprintf(s, "%s,", s); + } + sprintf(s, "%s%d", s, *(from->params[i])); + } + sprintf(s, "%s\n", s); + + debugMsg("Trace request synchro"); + + + // Saving trace + writeInTrace(s); +} + + +void traceAsynchronousSendRequest(request *req) { + char s[1024]; + int i; + + if (trace == TRACE_OFF) { + return; + } + + sprintf(s, "block=%s type=send_async channel=%s msgid=%ld params=", req->listOfRequests->owner, req->asyncChannel->outname, req->msg->id); + if (req->msg != NULL) { + debugMsg("Computing params"); + for(i=0; i<req->msg->nbOfParams; i++) { + if (i>0) { + sprintf(s, "%s,", s); + } + sprintf(s, "%s%d", s, req->msg->params[i]); + } + } + sprintf(s, "%s\n", s); + + + + // Saving trace + writeInTrace(s); +} + + +void traceAsynchronousReceiveRequest(request *req) { + char s[1024]; + int i; + + if (trace == TRACE_OFF) { + return; + } + + sprintf(s, "block=%s type=receive_async channel=%s msgid=%ld params=", req->listOfRequests->owner, req->asyncChannel->outname, req->msg->id); + if (req->msg != NULL) { + debugMsg("Computing params"); + for(i=0; i<req->msg->nbOfParams; i++) { + if (i>0) { + sprintf(s, "%s,", s); + } + sprintf(s, "%s%d", s, req->msg->params[i]); + } + } + sprintf(s, "%s\n", s); + + + + // Saving trace + writeInTrace(s); +} + + + +void traceRequest(char *myname, request *req) { + char s[1024]; + int i; + + + debugMsg("Trace request"); + + + if (trace == TRACE_OFF) { + return; + } + + // Build corresponding char*; + + switch(req->type) { + case SEND_SYNC_REQUEST: + debug2Msg("Sync channel", req->syncChannel->outname); + sprintf(s, "block=%s type=send_synchro channel=%s params=", myname, req->syncChannel->outname); + for(i=0; i<req->nbOfParams; i++) { + if (i>0) { + sprintf(s, "%s,", s); + } + sprintf(s, "%s%d", s, *(req->params[i])); + } + sprintf(s, "%s\n", s); + + break; + case RECEIVE_SYNC_REQUEST: + sprintf(s, "block=%s type=receive_synchro channel=%s\n", myname, req->syncChannel->inname); + break; + case SEND_ASYNC_REQUEST: + debug2Msg("Async channel", req->asyncChannel->outname); + sprintf(s, "block=%s type=send_async_2 channel=%s\n", myname, req->asyncChannel->outname); + break; + case RECEIVE_ASYNC_REQUEST: + sprintf(s, "block=%s type=receive_async_2 channel=%s\n", myname, req->asyncChannel->inname); + break; + case SEND_BROADCAST_REQUEST: + debug2Msg("Sync channel", req->syncChannel->outname); + sprintf(s, "block=%s type=send_broadcast channel=%s\n", myname, req->syncChannel->outname); + break; + case RECEIVE_BROADCAST_REQUEST: + debug2Msg("Sync channel", req->syncChannel->outname); + sprintf(s, "block=%s type=receive_broadcast channel=%s\n", myname, req->syncChannel->outname); + break; + case IMMEDIATE: + sprintf(s, "block=%s type=action\n", myname); + break; + default: + sprintf(s, "block=%s type=unknown\n", myname); + } + + debugMsg("Trace request 2"); + + + // Saving trace + writeInTrace(s); +} diff --git a/executablecode/src_soclib/tracemanager.h b/executablecode/src_soclib/tracemanager.h new file mode 100755 index 0000000000..fb92d9fe6c --- /dev/null +++ b/executablecode/src_soclib/tracemanager.h @@ -0,0 +1,21 @@ +#ifndef TRACEMANAGER_H +#define TRACEMANANER_H + +#include "request.h" + +#define CHAR_ALLOC_SIZE 1024 + + +void activeTracingInFile(); +void unactiveTracing(); +void traceRequest(char *myname, request *req); +void traceFunctionCall(char *block, char *func, char* params); +void traceVariableModification(char *block, char *var, int value, int type); // type=0: int type = 1:bool +void traceStateEntering(char *myname, char *statename); +void traceSynchroRequest(request *from, request *to); +void traceAsynchronousSendRequest(request *req); +void traceAsynchronousReceiveRequest(request *req); + +#endif + + -- GitLab