Problème avec le FTP OutboundAdapter sur IRIS PEX
Bonjour,
En souhaitant créer une production qui récupère un fichier provenant d'une API et qui envoie ce fichier sur un serveur SFTP, j'ai rencontré un problème avec la librairie de Guillaume Rongier.
Je reçois des bytes depuis une opération jusque là aucun problème, j'ai lié un SFTP à mon opération, les credentials sont les bons, l'adresse ip, le port, le dossier dans lequel déposer le fichier également. Cependant j'ai cette erreur:
ERROR <Ens>ErrException: <METHOD DOES NOT EXIST>PutStream+11^EnsLib.FTP.OutboundAdapter.1 *Rewind,%SYS.Python -- logged as '-'
number - @''Voici mon code:
from grongier.pex import BusinessOperation
from datetime import datetime
from X.xImport import XMessage
classXArticleExport(BusinessOperation):defget_adapter_type():return"EnsLib.FTP.OutboundAdapter"defon_init(self):return super().on_init()
defon_message(self, request: XMessage):# message = request.Stream.decode("utf-8")if hasattr(request, "Stream"):
filename = "exportArticles_" + datetime.now().strftime("%Y%m%d") + ".csv"
esc: int = self.Adapter.PutStream(filename, request.Stream)
if esc == 1:
self.log_info("File imported successfully with code : " + esc)
else:
self.log_error("Pas de stream reçu ! Code: " + esc)
return super().on_message(request)
Le problème vient-il de moi, où s'agit-il d'un bug ?
Merci d'avance !
Cordialement,
Cyril
Product version: IRIS 2023.1
$ZV: IRIS for UNIX (Ubuntu Server LTS for x86-64 Containers) 2023.1.1 (Build 380U) Fri Jul 7 2023 23:53:46 EDT [Health:5.1.0-1.m1]
Discussion (1)0
Comments
Salut Cyril,
La méthode PutStream() prend un objet %Stream d'IRIS en paramètre. Il faut donc que tu convertisses ton fichier en %Stream avant de l'envoyer.
Exemple :
XMessage
from grongier.pex import Message
from dataclasses import dataclass
@dataclass
class XMessage(Message):
StringStream: str
BinaryStream: bytes
BusinessOperation
from grongier.pex import BusinessOperation
from datetime import datetime
from X.xImport import XMessage
import iris
class XArticleExport(BusinessOperation):
def get_adapter_type():
return "EnsLib.FTP.OutboundAdapter"
def on_init(self):
return super().on_init()
def on_message(self, request: XMessage):
if hasattr(request, "Stream"):
# Préparation du stream
stream = iris.cls("%Stream.GlobalCharacter")._New()
source = ""
# Si le stream est en string on l'utilise tel quel
if request.StringStream:
source = request.StringStream
elif request.BinaryStream:
source = request.BinaryStream.decode("utf-8")
# On écrit le contenu du fichier dans le stream
n = 4092
chunks = [source[i:i+n] for i in range(0, len(source), n)]
for chunk in chunks:
stream.Write(chunk)
filename = "exportArticles_" + datetime.now().strftime("%Y%m%d") + ".csv"
esc: int = self.Adapter.PutStream(filename, stream)
if esc == 1:
self.log_info("File imported successfully with code : " + esc)
else:
self.log_error("Pas de stream reçu ! Code: " + esc)
return super().on_message(request)