HL7v2 Parser
HL7’s Version 2.x (V2) is a messaging standard for electronic data exchange in the clinical domain and a widely implemented standard for healthcare in the world. This messaging standard allows the exchange of clinical data between systems. It is designed to support a central patient care system as well as a more distributed environment where data resides in departmental systems. The HL7v2 parser supports data in HL7v2 format and HL7v2 format bundled using MLLP. For more information on the standard, see HL7 V2 Product Suite.
Supported sources
The following sources are supported with the HL7v2 parser:
TCP Reader
Cloud based storage: S3 Reader, GCS Reader, and ADLS Reader
File based storage
Kafka Reader
The HL7v2 parser will generate XMLNodeEvents (a Striim-based event in the form of a DOM4J XML document). After reading and parsing from a source, you can write to any target that supports the XMLNodeEvent (XML Formatter).
You can use a continuous query (CQ) to parse the XMLNodeEvent and retrieve the HL7v2 message content.
Use cases for HL7v2 Parser
The following are common use cases for the HL7v2 Parser:
Ingest an HL7v2 message into Striim for data transformation/processing: your HL7v2 messages may be loaded onto a cloud storage file. For example, you may read a file from Google Cloud storage that contains an HL7v2 message. You can use the HL7v2 Parser to parse the message into XML format, and print it to the console. See Sample application 1: ingest an HL7v2 message into Striim for data transformation/processing.
Process an ingested HL7v2 message for a business operation use-case, as well as pass it onto downstream applications: for example, every time an event is received, you want to look up a patient's file from a cache file to learn additional information about the patient. You can enrich your data with the data from the cache file, such as lab results. You can examine those results, and generate an alert for certain positive lab results. You can write the results to a target such as Google BigQuery. See Sample application 2: process ingested HL7v2 message for a business operation use-case.
For TQL samples for both these use cases, see Samples applications for HL7v2 parser.
HL7v2 Parser properties
Property | type | default value | comments |
---|---|---|---|
MLLPDelimited | Boolean | False | This property specifies that data is shared in MLLP format. In TCP Reader you should enable this property if the communication is in MLLP format. |
EnableMessageValidation | Boolean | False | HL7v2 message type requires certain segments and fields to be considered valid messages. Adding this boolean switch on the parser to enable default message structure/field validation. This is HL7v2 standard message validation. When this property is enabled with the |
Note
See also the TCP Reader topic for information about the Acknowledge Message property.
Samples applications for HL7v2 parser
The following are sample applications for the HL7v2 parser, demonstrating the use cases previously described.
Sample application 1: ingest an HL7v2 message into Striim for data transformation/processing
This sample reads a file from Google Cloud storage that contains an HL7v2 message, parses the message into XML format, and prints it to the console.
CREATE OR REPLACE APPLICATION HL7v2; CREATE OR REPLACE SOURCE MyGoogleHL7v2Data USING Global.GCSReader ( BucketName: 'nmp-hl7v2-striim', ConnectionRetryPolicy: 'retryInterval=30, maxRetries=3', ServiceAccountKey: '1482-4daa73e14474.json', PollingInterval: 5000, ProjectId: 'nitin-infra-1482', UseStreaming: false, DownloadPolicy: 'DiskLimit=2048,FileLimit=10', adapterName: 'GCSReader', IncludeSubfolders: true, ObjectDetectionMode: 'GCSDirectoryListing', dontBlockOnEOF: 'true', ObjectFilter: '*.zip', compressiontype: 'gzip' ) PARSE USING Global.HL7v2Parser ( handler: 'com.webaction.proc.HL7v2Parser', MLLPDelimited: false, DataValidation: false ) OUTPUT TO Hl7v2Out; CREATE TARGET Sysout USING Global.SysOut ( name: HL7v2XMLEvent ) INPUT FROM Hl7v2Out; END APPLICATION HL7v2;
Sample application 2: process ingested HL7v2 message for a business operation use-case
This sample looks up a patient's file from a cache file when an event is received, to learn additional information about the patient. The sample then enriches your data with the data from the cache file, such as lab results. The sample examines those results, and generates an alert for certain positive lab results. The sample writes the results to a target such as Google BigQuery.
CREATE OR REPLACE APPLICATION HL7v2Demo; CREATE OR REPLACE TYPE CacheFile ( PAN java.lang.String, FirstName java.lang.String, LastName java.lang.String, Address java.lang.String, City java.lang.String, State java.lang.String, Zip java.lang.String, Gender java.lang.String); CREATE SOURCE MLLP_HL7_Reader USING Global.TCPReader ( blocksize: 64, AckMLLP: true, IPAddress: 'localhost', portno: 10000, maxconcurrentclients: 5 ) PARSE USING Global.HL7v2Parser ( handler: 'com.webaction.proc.HL7v2Parser', MLLPDelimited: true, DataValidation: false ) OUTPUT TO HL7_Stream; CREATE OR REPLACE STREAM AlertStream OF Global.AlertEvent; CREATE OR REPLACE CACHE PatientLookUp USING Global.FileReader ( rolloverstyle: 'Default', blocksize: 64, skipbom: true, wildcard: 'HL7PatientLookUp.csv', directory: '/HOME/Healthcare/HL7v2/HL7', includesubdirectories: false, positionbyeof: false ) PARSE USING Global.DSVParser ( blockascompleterecord: false, charset: 'UTF-8', columndelimiter: ',', columndelimittill: '-1', handler: 'com.webaction.proc.DSVParser_1_0', header: true, headerlineno: 0, ignoreemptycolumn: false, ignoremultiplerecordbegin: 'true', ignorerowdelimiterinquote: false, linenumber: '-1', nocolumndelimiter: false, quoteset: '\"', rowdelimiter: '\n', separator: ':', trimquote: true, trimwhitespace: false ) QUERY ( keytomap: 'FirstName', skipinvalid: 'false' ) OF CacheFile; CREATE OR REPLACE CQ FilterPatientNameCQ INSERT INTO PatientNameFilter_Stream SELECT data.element("ORU_R01.PATIENT_RESULT").element("ORU_R01.PATIENT").element("PID").element("PID.5"). element("XPN.2").getText() as LastName, data.element("ORU_R01.PATIENT_RESULT").element("ORU_R01.PATIENT").element("PID").element("PID.5"). element("XPN.1").element("FN.1").getText() as FirstName, data as data FROM HL7_Stream p WHERE data.getName() = "ORU_R01"; CREATE OR REPLACE TARGET Slack_Alert USING Global.SlackAlertAdapter ( adapterName: 'SlackAlertAdapter', OauthToken: 'hK26UnbbLabc123JhkxvVcMaVgwLwfTEi8DCCKgYKfx8HtFuCYwc1R4nHosC3UcuYF8inSuso9abc123==', ChannelName: 'striim-slack-alert-test' ) INPUT FROM AlertStream; CREATE OR REPLACE CQ PatientFilterCQ INSERT INTO Patient_Hits_Stream SELECT p.FirstName as FirstName, p.LastName as LastName, data as data, f.Address as Address, f.City as City, f.State as State FROM PatientNameFilter_Stream p, PatientLookUp f WHERE p.LastName = f.LastName and p.FirstName = f.FirstName; CREATE CQ PositivePatientsCQ INSERT INTO PositivePatients_Stream SELECT * FROM Patient_Hits_Stream p WHERE data.element("ORU_R01.PATIENT_RESULT").element("ORU_R01.ORDER_OBSERVATION"). element("ORU_R01.OBSERVATION").element("OBX").element("OBX.5").element("CWE.5").getText() = "POSITIVE"; CREATE OR REPLACE CQ AlertEventCQ INSERT INTO AlertStream SELECT 'Patient Found' as name, '302' as keyVal, 'info' as severity, 'raise' as flag, 'Flagged patient `' + p.FirstName + ' ' + p.LastName + '` found to be POSITIVE for the following test: ' + data.element("ORU_R01.PATIENT_RESULT").element("ORU_R01.ORDER_OBSERVATION").element("ORU_R01.OBSERVATION"). element("OBX").element("OBX.3").element("CE.2").getText() as message FROM PositivePatients_Stream p; CREATE CQ GetPatientInfo INSERT INTO BQColumnStream SELECT p.FirstName, p.LastName, p.Address, p.City, p.State FROM PositivePatients_Stream p; CREATE OR REPLACE TARGET BigQuery USING Global.BigQueryWriter ( ColumnDelimiter: '|', Tables: 'howardtest.Patient', NullMarker: 'NULL', streamingUpload: 'false', Encoding: 'UTF-8', ServiceAccountKey: 'Platform/UploadedFiles/admin/striimexample-john-doe.json', ConnectionRetryPolicy: 'totalTimeout=600, initialRetryDelay=10, retryDelayMultiplier=2.0, maxRetryDelay=60 , maxAttempts=5, jittered=True, initialRpcTimeout=10, rpcTimeoutMultiplier=2.0, maxRpcTimeout=30', projectId: 'striimexample', AllowQuotedNewLines: 'false', CDDLAction: 'Process', optimizedMerge: 'false', TransportOptions: 'connectionTimeout=300, readTimeout=120', adapterName: 'BigQueryWriter', Mode: 'APPENDONLY', StandardSQL: 'true', includeInsertId: 'true', QuoteCharacter: '\"', BatchPolicy: 'eventCount:1000000, Interval:30' ) INPUT FROM BQColumnStream; END APPLICATION HL7v2Demo;