Skip to main content

HL7v2 Parser

HL7’s Version 2.x (V2) is a messaging standard for electronic data exchange in the clinical domain and a widely implemented standard for healthcare in the world. This messaging standard allows the exchange of clinical data between systems. It is designed to support a central patient care system as well as a more distributed environment where data resides in departmental systems. The HL7v2 parser supports data in HL7v2 format and HL7v2 format bundled using MLLP. For more information on the standard, see HL7 V2 Product Suite.

Supported sources

The following sources are supported with the HL7v2 parser:

  • TCP Reader

  • Cloud based storage: S3 Reader, GCS Reader, and ADLS Reader

  • File based storage

  • Kafka Reader

The HL7v2 parser will generate XMLNodeEvents (a Striim-based event in the form of a DOM4J XML document). After reading and parsing from a source, you can write to any target that supports the XMLNodeEvent (XML Formatter).

You can use a continuous query (CQ) to parse the XMLNodeEvent and retrieve the HL7v2 message content.

Use cases for HL7v2 Parser

The following are common use cases for the HL7v2 Parser:

  • Ingest an HL7v2 message into Striim for data transformation/processing: your HL7v2 messages may be loaded onto a cloud storage file. For example, you may read a file from Google Cloud storage that contains an HL7v2 message. You can use the HL7v2 Parser to parse the message into XML format, and print it to the console. See Sample application 1: ingest an HL7v2 message into Striim for data transformation/processing.

  • Process an ingested HL7v2 message for a business operation use-case, as well as pass it onto downstream applications: for example, every time an event is received, you want to look up a patient's file from a cache file to learn additional information about the patient. You can enrich your data with the data from the cache file, such as lab results. You can examine those results, and generate an alert for certain positive lab results. You can write the results to a target such as Google BigQuery. See Sample application 2: process ingested HL7v2 message for a business operation use-case.

For TQL samples for both these use cases, see Samples applications for HL7v2 parser.

HL7v2 Parser properties

Property

type

default value

comments

MLLPDelimited

Boolean

False

This property specifies that data is shared in MLLP format. In TCP Reader you should enable this property if the communication is in MLLP format.

EnableMessageValidation

Boolean

False

HL7v2 message type requires certain segments and fields to be considered valid messages. Adding this boolean switch on the parser to enable default message structure/field validation. This is HL7v2 standard message validation.

When this property is enabled with the Acknowledge Message property in the TCP Reader , Striim will send back a negative acknowledgement (NACK) message to the sender on badly constructed HL7v2 messages.

Note

See also the TCP Reader topic for information about the Acknowledge Message property.

Samples applications for HL7v2 parser

The following are sample applications for the HL7v2 parser, demonstrating the use cases previously described.

Sample application 1: ingest an HL7v2 message into Striim for data transformation/processing

This sample reads a file from Google Cloud storage that contains an HL7v2 message, parses the message into XML format, and prints it to the console.

CREATE OR REPLACE APPLICATION HL7v2;

CREATE OR REPLACE SOURCE MyGoogleHL7v2Data USING Global.GCSReader ( 
  BucketName: 'nmp-hl7v2-striim', 
  ConnectionRetryPolicy: 'retryInterval=30, maxRetries=3', 
  ServiceAccountKey: '1482-4daa73e14474.json', 
  PollingInterval: 5000, 
  ProjectId: 'nitin-infra-1482', 
  UseStreaming: false, 
  DownloadPolicy: 'DiskLimit=2048,FileLimit=10', 
  adapterName: 'GCSReader', 
  IncludeSubfolders: true, 
  ObjectDetectionMode: 'GCSDirectoryListing', 
   dontBlockOnEOF: 'true',
  ObjectFilter: '*.zip', 
  compressiontype: 'gzip' ) 
PARSE USING Global.HL7v2Parser ( 
  handler: 'com.webaction.proc.HL7v2Parser', 
  MLLPDelimited: false,
  DataValidation: false ) 
OUTPUT TO Hl7v2Out;

CREATE TARGET Sysout USING Global.SysOut ( 
  name: HL7v2XMLEvent ) 
INPUT FROM Hl7v2Out;

END APPLICATION HL7v2;

Sample application 2: process ingested HL7v2 message for a business operation use-case

This sample looks up a patient's file from a cache file when an event is received, to learn additional information about the patient. The sample then enriches your data with the data from the cache file, such as lab results. The sample examines those results, and generates an alert for certain positive lab results. The sample writes the results to a target such as Google BigQuery.

CREATE OR REPLACE APPLICATION HL7v2Demo;

CREATE OR REPLACE TYPE CacheFile (
 PAN java.lang.String,
 FirstName java.lang.String,
 LastName java.lang.String,
 Address java.lang.String,
 City java.lang.String,
 State java.lang.String,
 Zip java.lang.String,
 Gender java.lang.String);

CREATE SOURCE MLLP_HL7_Reader USING Global.TCPReader ( 
  blocksize: 64, 
  AckMLLP: true, 
  IPAddress: 'localhost', 
  portno: 10000, 
  maxconcurrentclients: 5 ) 
PARSE USING Global.HL7v2Parser ( 
  handler: 'com.webaction.proc.HL7v2Parser', 
  MLLPDelimited: true,
  DataValidation: false ) 
OUTPUT TO HL7_Stream;

CREATE OR REPLACE STREAM AlertStream OF Global.AlertEvent;

CREATE OR REPLACE CACHE PatientLookUp USING Global.FileReader ( 
  rolloverstyle: 'Default', 
  blocksize: 64, 
  skipbom: true, 
  wildcard: 'HL7PatientLookUp.csv', 
  directory: '/HOME/Healthcare/HL7v2/HL7', 
  includesubdirectories: false, 
  positionbyeof: false ) 
PARSE USING Global.DSVParser ( 
  blockascompleterecord: false,
  charset: 'UTF-8',
  columndelimiter: ',',
  columndelimittill: '-1',
  handler: 'com.webaction.proc.DSVParser_1_0',
  header: true,
  headerlineno: 0,
  ignoreemptycolumn: false,
  ignoremultiplerecordbegin: 'true',
  ignorerowdelimiterinquote: false,
  linenumber: '-1',
  nocolumndelimiter: false,
  quoteset: '\"',
  rowdelimiter: '\n',
  separator: ':',
  trimquote: true,
  trimwhitespace: false ) 
QUERY ( 
  keytomap: 'FirstName',
  skipinvalid: 'false' ) 
OF CacheFile;

CREATE OR REPLACE CQ FilterPatientNameCQ 
INSERT INTO PatientNameFilter_Stream 
SELECT 
data.element("ORU_R01.PATIENT_RESULT").element("ORU_R01.PATIENT").element("PID").element("PID.5").
  element("XPN.2").getText() as LastName,
data.element("ORU_R01.PATIENT_RESULT").element("ORU_R01.PATIENT").element("PID").element("PID.5").
  element("XPN.1").element("FN.1").getText() as FirstName,
data as data
FROM HL7_Stream p

WHERE data.getName() = "ORU_R01";

CREATE OR REPLACE TARGET Slack_Alert USING Global.SlackAlertAdapter ( 
  adapterName: 'SlackAlertAdapter', 
  OauthToken: 'hK26UnbbLabc123JhkxvVcMaVgwLwfTEi8DCCKgYKfx8HtFuCYwc1R4nHosC3UcuYF8inSuso9abc123==', 
  ChannelName: 'striim-slack-alert-test' ) 
INPUT FROM AlertStream;

CREATE OR REPLACE CQ PatientFilterCQ 
INSERT INTO Patient_Hits_Stream 
SELECT p.FirstName as FirstName,
p.LastName as LastName,
data as data,
f.Address as Address,
f.City as City,
f.State as State
FROM PatientNameFilter_Stream p, PatientLookUp f
WHERE p.LastName = f.LastName and p.FirstName = f.FirstName;

CREATE CQ PositivePatientsCQ 
INSERT INTO PositivePatients_Stream 
SELECT * FROM Patient_Hits_Stream p
WHERE 
data.element("ORU_R01.PATIENT_RESULT").element("ORU_R01.ORDER_OBSERVATION").
  element("ORU_R01.OBSERVATION").element("OBX").element("OBX.5").element("CWE.5").getText() = "POSITIVE";

CREATE OR REPLACE CQ AlertEventCQ 
INSERT INTO AlertStream 
SELECT 
'Patient Found' as name,
'302' as keyVal,
'info' as severity,
'raise' as flag,
'Flagged patient `' + p.FirstName + ' ' + p.LastName + '` found to be POSITIVE for the following test: ' + 
  data.element("ORU_R01.PATIENT_RESULT").element("ORU_R01.ORDER_OBSERVATION").element("ORU_R01.OBSERVATION").
  element("OBX").element("OBX.3").element("CE.2").getText() as message
FROM PositivePatients_Stream p;

CREATE CQ GetPatientInfo 
INSERT INTO BQColumnStream 
SELECT p.FirstName, 
p.LastName, 
p.Address,
p.City,
p.State
FROM PositivePatients_Stream p;

CREATE OR REPLACE TARGET BigQuery USING Global.BigQueryWriter ( 
  ColumnDelimiter: '|', 
  Tables: 'howardtest.Patient', 
  NullMarker: 'NULL', 
  streamingUpload: 'false', 
  Encoding: 'UTF-8', 
  ServiceAccountKey: 'Platform/UploadedFiles/admin/striimexample-john-doe.json', 
  ConnectionRetryPolicy: 'totalTimeout=600, initialRetryDelay=10, retryDelayMultiplier=2.0,
    maxRetryDelay=60 , maxAttempts=5, jittered=True, initialRpcTimeout=10, rpcTimeoutMultiplier=2.0,
    maxRpcTimeout=30', 
  projectId: 'striimexample', 
  AllowQuotedNewLines: 'false', 
  CDDLAction: 'Process', 
  optimizedMerge: 'false', 
  TransportOptions: 'connectionTimeout=300, readTimeout=120', 
  adapterName: 'BigQueryWriter', 
  Mode: 'APPENDONLY', 
  StandardSQL: 'true', 
  includeInsertId: 'true', 
  QuoteCharacter: '\"', 
  BatchPolicy: 'eventCount:1000000, Interval:30' ) 
INPUT FROM BQColumnStream;

END APPLICATION HL7v2Demo;