Inter-process communication avec $SYSTEM.Event
Salut les devs,
Actuellement, je travaille sur un projet qui requiert une gestion hautement dynamique des événements. Dans le contexte du langage de programmation Java, mon premier choix aurait été d'opter instinctivement pour l'utilisation du "Observer Pattern". Le "Observer Pattern" représente une approche pour gérer les interactions entre les objets en établissant un mécanisme de notification. Il permet à plusieurs observateurs de réagir de manière autonome aux changements d'état d'un sujet, favorisant ainsi la flexibilité et la modularité du code. Si vous n'êtes pas familier avec ce modèle de conception, vous pouvez trouver de plus amples informations à ce sujet sur ceWikipedia
Bien que ce soit naturel et couramment utilisé dans certains langages de programmations comme le Java ou le C++, en ObjectScript pas du tout.
Ces langages diffèrent considérablement, Java permet de lancer des threads, tandis qu'en ObjectScript, nous lançons de nouveaux processus. Cette différence est notable dans plusieurs aspects :
- Les threads: partagent la même mémoire, ce qui facilite la communication et le partage de données entre eux. Cependant, cela peut également entraîner des problèmes de concurrence si la synchronisation n'est pas gérée correctement.
- Les processus: sont des exécutions indépendantes avec leur propre espace mémoire. Ils offrent une isolation complète les uns par rapport aux autres, ce qui signifie qu'un processus ne peut pas accéder directement à la mémoire d'un autre. En raison de leur isolation, les processus sont plus sûrs en cas d'erreurs, car un processus défectueux n'affectera pas les autres processus du système. Cependant, la communication entre processus est plus complexe que la communication entre threads, car elle nécessite des mécanismes de communication inter-processus (IPC) tels que les sockets, les tubes (pipes), ou les files de messages.
Cette différence fondamentale demande une approche différente. Cependant, ne serait-il pas envisageable de développer rapidement une bibliothèque pour résoudre ce problème? C'est la question à laquelle je me suis confronté, et je vais à présent vous présenter une implémentation. Comme c'est souvent le cas dans mes articles, vous trouverez les sources sur un repository GitHub, ainsi qu'un package disponible sur Open Exchange.
L'objectif n'est pas de reproduire l'Observer Pattern, mais plutôt de développer une solution permettant de faciliter la communication inter-processus, de manière à notifier les processus en attente d'un événement.
Prenons l'exemple d'un processus qui gère la création de patients. Dans ce scénario, nous pourrions représenter la séquence d'activités suivante :
.png)
Le processus A informe le gestionnaire des événements "Patient" et est chargé d'envoyer un message à tous les observateurs enregistrés. Pour ce faire, il est essentiel de déterminer comment mettre en place la communication inter-processus. En consultant la documentation, j'ai rapidement découvert une classe parfaite pour résoudre ce problème $SYSTEM.Event.
Vous pouvez effectuer un test rapide en ouvrant deux sessions terminal IRIS. La première session est destinée à créer la ressource et à écouter les messages entrants, tandis que la seconde session est utilisée pour envoyer des notifications
|
|
Suite à l’envoi du signal, le contenu de la variable "message" sera :
$lb(1,"Hello, this is a message from PID:710")La première position est un code de retour:
- 1: message reçu.
- 0: timeout (10 secondes dans l’exemple ci-dessus).
- -1: situation d’erreur ou la resource “MyResource” serait supprimée.
Note: Il n'est pas obligatoire de créer la ressource avec $SYSTEM.Event.Create, dans ce cas, l'attente peut se faire avec la ligne suivante:
Set message = $SYSTEM.Event.WaitMsg("",10) Il faut alors que le signal précise le PID du processus en écoute:
$SYSTEM.Event.Signal(<pid>, "Hello from:"_$Job)Dans cet exemple, nous avons transmis une simple chaîne de caractères, car il n'est pas possible de passer directement des objets. Comme expliqué précédemment dans cet article, un processus ne peut pas accéder à la mémoire d'un autre processus, ce qui rend impossible la transmission de références d'objets. Toutefois, grâce à l'existence de la classe %DynamicObject, nous avons la possibilité de convertir facilement des objets en chaînes de caractères JSON et vice versa. Ainsi, nous pouvons faire :
Do$SYSTEM.Event.Signal("MyResource",{"PID":($JOB),"Msg":"Hello!"}.%ToJSON())
; Et lors de la réception du message : Set object = {}.%FromJSON($ListGet(message,2))De plus, si vous travaillez avec une version récente d'IRIS, vous avez la possibilité d'étendre vos classes existantes en utilisant %JSON.Adaptor
et d'exporter facilement leurs instances avec la méthode %JSONExportToString.
En combinant l'utilisation de $SYSTEM.Event et %DynamicObject, vous pouvez réaliser en seulement quelques lignes la communication entre des processus, en envoyant aussi bien des messages simples que des objets.
Il est important de noter que bien que $SYSTEM.Event soit très pratique, il ne fonctionnera pas sur un système ECP (Enterprise Cache Protocol), comme mentionné dans la documentation officielle :
There is no networking support for these functions - processes can only wait on and awaken resources on the same system.
Il est impératif d'exercer une grande prudence lors de l'utilisation de $SYSTEM.Event, car il repose sur "SharedMemory", un espace limité et essentiel pour le fonctionnement du système. Si un processus envoie un grand nombre de signaux et que le processus de destination ne peut pas les traiter suffisamment rapidement, la file d'attente de signaux va s'allonger, ce qui peut entraîner un effondrement de votre système. Voici un aperçu de ce que vous pourriez retrouver dans le fichier "messages.log" :
10/04/23-08:33:47:501 (1057) 1 [Generic.Event] SMH Surrender Stage 1 started.
10/04/23-08:33:47:542 (1057) 1 [Generic.Event] SMH Surrender Stage 2 started.
10/04/23-08:33:48:547 (818) 2 [Utility.Event] ISCLOG: WorkMgr appendError: Error ns=%SYS rtn=%SYS.WorkQueueMgr data=$lb("rc","0 "_$lb($lb(5002,"<STORE>RunDaemon+167^%SYS.WorkQueueMgr",,,,,,,,$lb(,"%SYS",$lb("D^RunDaemon+167^%SYS.WorkQueueMgr +1","D^StartWorkDaemon+4^STU +1"))))/* ERROR #5002: ObjectScript error: <STORE>RunDaemon+167^%SYS.WorkQueueMgr */,"group",,"stack",$lb("d^RunDaemon+226^%SYS.WorkQueueMgr^1","d^StartWorkDaemon+4^STU^1","d^^^0"),"$zu(56,2)","$Id: //iris/2023.2.0/kernel/common/src/events.c#1 $ 568 4")
10/04/23-08:34:00:202 (1033) 2 [Utility.Event] [SYSTEM MONITOR] SMHState Alert: Shared Memory Heap state Troubled
10/04/23-08:34:00:978 (1032) 0 [Utility.Event] Task Manager Error from CheckSchedule - Error 0
10/04/23-08:34:51:308 (1063) 2 [Utility.Event] ISCLOG: CSPServer Error displaying login page $ZE= ns=%SYS rtn=%SYS.cspServer
10/04/23-08:35:00:290 (1033) 2 [Utility.Event] [SYSTEM MONITOR] SMHPercentFull Alert: SMHPercentFull = 100, 100, 100 (Max value is 98).Pour prévenir une telle situation, il est possible d'opter pour une communication bidirectionnelle. Lorsqu'un signal est envoyé, le processus attend une confirmation "ACK" avant de poursuivre, comme illustré dans l'exemple suivant ::
Do$SYSTEM.Event.Create("MyResource")
Write !,"Type < ctrl+c > to stop listening."Try {
For {
Set result = $SYSTEM.Event.WaitMsg("MyResource",10)
Set code = $ListGet(result,1)
Continue:code=0Quit:code=-1Set event = {}.%FromJSON($ListGet(result,2))
Write !, "Content : ", event.Content
Do$SYSTEM.Event.Signal(event.PID) ; Send aknowledgement
}
} Catch ex {
If$SYSTEM.Event.Defined("MyResource") {
Do$SYSTEM.Event.Clear("MyResource")
Do$SYSTEM.Event.Delete("MyResource")
}
}
For i=1:1:100 {
Do$SYSTEM.Event.Signal("MyResource",{"PID":($Job),"Content":"Hello!"})
Set code = $SYSTEM.Event.Wait("",3) ; Wait acknowledgementIf code < 1 {
Write"Something went wrong."If$SYSTEM.Event.Defined("MyResource") {
Do$SYSTEM.Event.Clear("MyResource")
Do$SYSTEM.Event.Delete("MyResource")
}
Quit
}
}
En adoptant cette approche, la file d'attente de votre processus ne contiendra jamais plus d'un message à la fois. Assurez-vous également de toujours effectuer une opération "Clear" avant le "Delete" pour éviter que des messages non traités ne restent indéfiniment dans la file d'attente.
Maintenant que nous avons établi les bases, nous pouvons commencer par implémenter une classe de base "abstraite" qui jouera le rôle de Listener.
Class dc.ipcutils.ListenerAbstract Extends%RegisteredObject
{
Parameter EVENTTYPE;Parameter WAITTIMEOUT = 10;Parameter VERBOSE = 0;Property ResourceName As%String [ Internal ];Property Verbose As%Boolean [ InitialExpression = {$Get(%zverbose,..#VERBOSE)}, Internal ];Property EventType As%String [ InitialExpression = {..#EVENTTYPE} ];Property Event As%DynamicObject;/// could be a string or a dynamicobjectProperty Data;Property Context As%DynamicObject;Property LastState As%Integer [ InitialExpression = 0 ];
Method OnStartListen(Context As%DynamicObject = {{}}) As%Status
{
Set..ResourceName = ..GenResourceName()
Do$SYSTEM.Event.Create(..ResourceName), ##class(dc.ipcutils.Manager).Subscribe(##this, Context)
Write:..Verbose !, $zdt($h,3,1), " + Listening ", ..EventType, " with resourcename ", ..ResourceName, " started."Quit$$$OK
}
Method Listen() As%Status
{
Set sc = $$$OK$$$QuitOnError(..OnStartListen())
Try {
ForIf..Wait() = -1$$$ThrowStatus($$$ERROR($$$GeneralError,$$$FormatText("Resource %1 deleted.",..ResourceName)))
} Catch Ex {
If Ex.Name '[ "<INTERRUPT>"Set sc = Ex.AsStatus()
}
Quit$$$ADDSC(sc,..OnStopListen())
}
Method Wait(
TimeOut As%Integer = {..#WAITTIMEOUT},
sc As%Status = {$$$OK}) As%Integer
{
Do:..LastState=1..SendAck() ; We have to send a ack before waiting a new incoming message.Set result = $SYSTEM.Event.WaitMsg(..ResourceName, TimeOut)
Set..LastState = $ListGet(result,1)
If..LastState < 1Quit..LastStateSet..Event = {}.%FromJSON($ListGet(result,2))
Set..Data = ..Event.Data
Set..Context = ..Event.Context
Do..Event.%Remove("Data")
Do..Event.%Remove("Context")
Try {
Do..Update(..Event, ..Data, ..Context)
} Catch (ex) {
Set sc = ex.AsStatus()
Set^Listener.Err("last") = $zdt($h,3,1)_" "_$SYSTEM.Status.GetOneErrorText(sc)
}
Quit..LastState
}
Method SendAck()
{
Do$SYSTEM.Event.Signal(..Event.PIDSource,..Event.%ToJSON())
}
Method Update(
EventObject As%DynamicObject,
Data As%DynamicObject,
Context As%DynamicObject) As%Status
{
Quit$$$OK
}
Method WaitEvent(
Output Event As%DynamicObject,
TimeOut As%Integer = {..#WAITTIMEOUT}) As%Integer
{
Set result = $SYSTEM.Event.WaitMsg(..ResourceName, TimeOut), returnCode = $ListGet(result,1), Event = ""If returnCode < 1Quit returnCode
Set..Event = {}.%FromJSON($ListGet(result,2))
Set..Data = Event.Data
Set..Context = Event.Context
Do..Event.%Remove("Data"), ..Event.%Remove("Context")
Quit returnCode
}
Method OnStopListen(Context As%DynamicObject = {{}}) As%Status
{
Write:..Verbose !, $zdt($h,3,1), " - Listening ", ..EventType, " with resourcename ", ..ResourceName, " has been STOPPED."Do:$SYSTEM.Event.Defined(..ResourceName) $SYSTEM.Event.Clear(..ResourceName), $SYSTEM.Event.Delete(..ResourceName)
Quit##class(dc.ipcutils.Manager).UnSubscribe(##this, Context)
}
Method GenResourceName() As%String [ CodeMode = expression, Private ]
{
$Translate($SYSTEM.Encryption.Base64Encode($Job_$zcrc(..EventType_$ZDT($H,3,1),7)),"=")
}
}
Les listeners doivent s'inscrire auprès d'un "Manager". Passons maintenant à l'écriture de ce manager, qui permettra de gérer les inscriptions, les désinscriptions et les notifications :
Include ipcutils
Class dc.ipcutils.Manager
{
Parameter ACKTIMEOUT = 3;ClassMethod Subscribe(
Observer As dc.ipcutils.ListenerAbstract,
Context As%DynamicObject = {{}}) As%Status
{
Set$$$Subscribed(Observer.EventType, $Classname(Observer), Observer.ResourceName) = $ListBuild(Observer.EventType,Observer.ResourceName,$zdt($H,3,1),$Job,Context.%ToJSON())
Quit$$$OK
}
ClassMethod UnSubscribe(
Observer As%String,
Context As%DynamicObject) As%Status
{
Kill$$$Subscribed(Observer.EventType, $Classname(Observer), Observer.ResourceName)
Quit$$$OK
}
ClassMethod Notify(
Event As%String,
Data As%DynamicObject) As%Status
{
#def1arg ResourceName(%val) $QSubscript(%val,3)
#def1arg Context(%val) {}.%FromJSON($ListGet(%val,5))
#dim NotifyObject As%DynamicObject = ##class(dc.ipcutils.Manager).GenEventObject(Event)
Set sc = $$$OKSet node = $Name($$$Subscribed(NotifyObject.EventType))
For {
Set node = $Query(@node,1,value)
Quit:node=""||(NotifyObject.EventType'=$QSubscript(node,1))
Set resourceName = $$$ResourceName(node)
Set NotifyObject.Context = $$$Context(value), NotifyObject.Data = Data
; check if the resource exists AND if process that create the resource still exists! If '$System.Event.Defined(resourceName) || '$Data(^$JOB($ListGet(value, 4))) Do UnSubscribe ContinueDo$SYSTEM.Event.Signal(resourceName, NotifyObject.%ToJSON())
Do WaitAck
Do NotifyObject.%Remove("Context"), NotifyObject.%Remove("Data")
}
Quit sc
UnSubscribe
Do:$SYSTEM.Event.Defined(resourceName) $SYSTEM.Event.Clear(resourceName), $SYSTEM.Event.Delete(resourceName)
Kill @node
Quit
WaitAck
Set start = $zh, match = $$$NODo {
Set syncResult = $SYSTEM.Event.WaitMsg("", ..#ACKTIMEOUT)
Set syncStatus = $ListGet(syncResult, 1)
If syncStatus < 1Do UnSubscribe QuitSet msg = {}.%FromJSON($ListGet(syncResult, 2))
Set match = msg.MessageID = NotifyObject.MessageID ; Ok this is the expected ackQuit:match
} While (start + ..#ACKTIMEOUT > $zh)
Do:'match UnSubscribe ; no ACK received -> force Unsubscribe this subscriberQuit
}
ClassMethod GenEventObject(Event As%String) As%DynamicObject
{
Quit {
"Event":(Event),
"EventType":($Piece(Event,":",1)),
"EventName":($Piece(Event,":",2)),
"PIDSource":($JOB),
"Timestamp":($ZDateTime($Horolog,3,1)),
"MessageID":($Increment(^dc.ipcutils.msg))
}
}
ClassMethod HashContext(Context As%DynamicObject) As%String [ CodeMode = expression, Internal, Private ]
{
$ZCRC($Select($IsObject(Context):Context.%ToJSON(),1:Context),7)
}
ClassMethod ShowSubscribed()
{
Set node = $Name($$$Subscribed)
For {
Set node = $Query(@node,1,value)
Quit:node=""Write !," * Event: ", $QSubscript(node,1), " ClassName: ", $QSubscript(node,2)
Write !," Date time: ",$Lg(value,3)
Write !," PID: ", $Lg(value,4)
Write !," Context: ",$Lg(value,5)
If$ListGet(value,6)=1Write !," ResourceName: ",$QSubscript(node,3)
}
Quit
}
ClassMethodKill() [ Internal ]
{
Kill$$$Subscribed
}
}
Avec le "Manager" en place, nous pouvons désormais développer un premier listener concret :
/// Basic Listener for demo purpose<br/>/// Open a terminal: <br/>/// Set listener = ##class(dc.ipcutils.BasicListener).%New()<br/>/// Do listener.Listen()<br/>/// or in one line : Do ##class(dc.ipcutils.BasicListener).%New().Listen()<br/>/// Type ctrl+c to stop.<br/>/// Open anoter terminal:<br/>/// Do ##class(dc.ipcutils.Manager).Notify("Demo:OnTestString","This is string notification")<br/>/// Do ##class(dc.ipcutils.Manager).Notify("Demo","This is a demo notification")<br/>/// Do ##class(dc.ipcutils.Manager).Notify("Demo:OnTestObject",{"object":"demo"})<br/>Class dc.ipcutils.BasicListener Extends dc.ipcutils.ListenerAbstract
{
Parameter EVENTTYPE = "Demo";Parameter VERBOSE = 1;
Method Listen() As%Status
{
Set sc = $$$OK$$$QuitOnError(..OnStartListen())
Try {
Write:..Verbose !,$zdt($h,3,1)," + Type < ctrl+c > to stop listening."ForIf..Wait() = -1$$$ThrowStatus($$$ERROR($$$GeneralError,$$$FormatText("Resource %1 deleted.",..ResourceName)))
} Catch Ex {
If Ex.Name '[ "<INTERRUPT>"Set sc = Ex.AsStatus()
If..Verbose, $$$ISERR(sc) Do$SYSTEM.Status.DisplayError(sc)
}
Quit$$$ADDSC(sc,..OnStopListen())
}
Method Update(
Event As%DynamicObject,
Data As%DynamicObject,
Context As%DynamicObject) As%Status
{
Set dt = $ZDateTime($Horolog, 3, 1)
Write:..Verbose !,dt," + Update received!"Write !,dt, " = Event : ", !
Do##class(%JSON.Formatter).%New().Format(Event)
Write !,dt, " = Context : ", !
Do##class(%JSON.Formatter).%New().Format(Context)
Write !,dt, " = Data : ", !
Do##class(%JSON.Formatter).%New().Format(Data)
Quit$$$OK
}
ClassMethod Test() As%Status
{
Quit ..%New().Listen()
}
}
Tout est à présent prêt pour effectuer un test. Ouvrez un terminal (ou plusieurs) avec une instance de dc.observer.BasicListener :
IRISAPP>d##class(dc.ipcutils.BasicListener).Test()
2023-10-1719:20:34 + Listening Demo with resourcename MjAxMjIzMzE5NzkwOTg1 started.
2023-10-1719:20:34 + Type < ctrl+c > to stop listening.Et un autre pour notifier des évènements "Demo":
Do##class(dc.ipcutils.Manager).Notify("Demo:OnTest",{"Message":"My FirstTest"}.%ToJSON())IRISAPP>d##class(dc.ipcutils.BasicListener).Test()
2023-10-1719:20:34 + Listening Demo with resourcename MjAxMjIzMzE5NzkwOTg1 started.
2023-10-1719:20:34 + Type < ctrl+c > to stop listening.
2023-10-1719:20:44 + Update received!
2023-10-1719:20:44 = Event :
{
"Event":"Demo:OnTestObject",
"EventType":"Demo",
"EventName":"OnTestObject",
"PIDSource":"20165",
"Timestamp":"2023-10-17 19:20:44",
"MessageID":2171
}
2023-10-1719:20:44 = Context :
{
}
2023-10-1719:20:44 = Data :
{
"object":"demo-1"
}Nous disposons désormais d'un exemple fonctionnel et relativement simple à mettre en place. Il vous suffit de créer une classe qui hérite de "dc.ipcutils.ListenerAbstract" et de l'adapter à vos besoins spécifiques. Bien que cela ne soit pas l'Observer Pattern, cette solution accomplit efficacement sa tâche.
À bientôt!
Comments
Super article, en plus avec une exclusivité en langue française :)
Ça me rappel des souvenir d'un projet ou nous avions construit une machine à état : https://developer.mozilla.org/fr/docs/Glossary/State_machine
A l'époque, les messages queues n'existaient pas et ton tuto non plus d’ailleurs ;) ça nous aurait bien aidé.
Pense le compléter avec un exemple appliqué aux websocket, j'ai l'impression que ça s'y prête bien.
Merci !!! Je rejoins Guillaume, je ne connaissais pas $SYSTEM.Event, et je cherchais quelque chose de similaire. Beau travail 😃