Examples

Examples

This content is archived.

 

Introduction

In this part is presented a detailed description of the available CogStack examples, which currently are:

  • Example 1 – Processing a simple, structured dataset from a single DB source.

  • Example 2 – Processing a combined structured and free-text dataset from a single DB source (as in Quickstart).

  • Example 3 – Processing a combined dataset from multiple DB sources, multiple jobs.

  • Example 4 – Processing a combined dataset with embedded documents from a single DB source.

  • Example 5 – 2-step processing of a combined dataset with embedded documents from a single DB source.

  • Example 6 – Basic security use-case: Example 2 extended reverse proxy enabling a secure access.

  • Example 7 – Logging: Example 6 extended with logging mechanisms.

  • Example 8 – Simple NLP use-case: drug annotation using GATE and based on Example 2.

  • Example 9 – Defining multi-component pipelines: Example 4 and Example 8 combined.

  • Example 10 –  Ingesting free-text data from DB source to ES, annotating with MedCAT and re-ingesting back to ES. 

  • Sample production deployment - structured project setup of files to be used for production deployments

The main directory with resources used in this tutorial is available in the the CogStack bundle under examples directory.

Some parts of this document are also used in CogStack Quickstart tutorial.



Note

This tutorial is based on the newest version of CogStack 1.3.0.

The previous versions of CogStack and documentation can be found in the official github repository.





Index :



How are the examples organized

Each of the examples is organized in a way that it can be deployed and run independently. The directory structure of examples/ tree is as follows:

. ├── docker-common │   ├── elasticsearch │   │   └── config │   │   └── elasticsearch.yml │   ├── fluentd │   │   └── conf │   │   └── fluent.conf │   ├── kibana │   │   └── config │   │   └── kibana.yml │   ├── nginx │   │   ├── auth │   │   └── config │   │   └── nginx.conf │   ├── pgsamples │   │   └── init_db.sh │   ├── pgjobrepo │   │ └── create_repo.sh │   └── docker.compose.yml ├── example1 │   ├── cogstack │   │   └── observations.properties │   ├── db_dump │   │   └── db_samples.sql.gz │   ├── docker │   │   └── docker-compose.override.yml │   ├── extra │   │   ├── db_create_schema.sql │   │   └── prepare_db.sh │   └── setup.sh ├── example10 │   ├── cogstack-pipeline │   │   ├── annotation_ingester │   │   │   └── config │   │   │   └── config.yml │   │   ├── cogstack │   │   │   ├── conf │   │   │   │   └── step-1 │   │   │   │   └── es_ingestion.properties │   │   │   └── run_pipeline.sh │   │   ├── docker-compose.yml │   │   ├── elasticsearch │   │   │   ├── config │   │   │   │   └── elasticsearch.yml │   │   │   └── scripts │   │   │   └── es_index_initializer.py │   │   ├── kibana │   │   │   └── config │   │   │   └── kibana.yml │   │   ├── medcat_service │   │   │   └── envs │   │   │   ├── env_app │   │   │   ├── env_medcat │   │   │   ├── env_medcat_snomed │   │   │   └── env_medcat_umls │   │   └── scripts │   │   ├── create_pgsql_job_repo.sh │   │   └── create_source_and_sink_db.sh │   ├── data │   │   ├── db_samples.sql.gz │   │   └── models │   │   └── download_medmen.sh │   ├── README ├── example2 │   ├── cogstack │   │   └── observations.properties │   ├── db_dump │   │   └── db_samples.sql.gz │   ├── docker │   │   └── docker-compose.override.yml │   ├── extra │   │   ├── db_create_schema.sql │   │   └── prepare_db.sh │   └── setup.sh ├── example3 │   ├── cogstack │   │   ├── gen_config.sh │   │   ├── mt.properties │   │   └── template.properties │   ├── db_dump │   │   ├── db_samples-mt.sql.gz │   │   └── db_samples-syn.sql.gz │   ├── docker │   │   └── docker-compose.override.yml │   ├── extra │   │   ├── db_create_mt_schema.sql │   │   ├── db_create_syn_schema.sql │   │   ├── prepare_mtsamples_db.sh │   │   └── prepare_synsamples_db.sh │   └── setup.sh ├── example4 │   ├── cogstack │   │   ├── observations.properties │   │   └── test2.sh │   ├── db_dump │   │   ├── db_samples-docx-small.sql.gz │   │   ├── db_samples-jpg-small.sql.gz │   │   ├── db_samples-pdf-img-small.sql.gz │   │   └── db_samples-pdf-text-small.sql.gz │   ├── docker │   │   └── docker-compose.override.yml │   ├── extra │   │   ├── db_create_schema.sql │   │   ├── prepare_db.sh │   │   └── prepare_single_db.sh │   └── setup.sh ├── example5 │   ├── cogstack │   │   ├── conf │   │   │   ├── step-1 │   │   │   │   └── reports.properties │   │   │   └── step-2 │   │   │   └── observations.properties │   │   └── test2.sh │   ├── db_dump │   │   ├── db_samples-docx-small.sql.gz │   │   ├── db_samples-jpg-small.sql.gz │   │   ├── db_samples-pdf-img-small.sql.gz │   │   └── db_samples-pdf-text-small.sql.gz │   ├── docker │   │   └── docker-compose.override.yml │   ├── extra │   │   ├── db_create_schema.sql │   │   ├── prepare_db.sh │   │   └── prepare_single_db.sh │   └── setup.sh ├── example6 │   ├── cogstack │   │   └── observations.properties │   ├── db_dump │   │   └── db_samples.sql.gz │   ├── docker │   │   └── docker-compose.override.yml │   └── setup.sh ├── example7 │   ├── cogstack │   │   └─── observations.properties │   ├── db_dump │   │   └── db_samples.sql.gz │   ├── docker │   │   └── docker-compose.override.yml │   └── setup.sh ├── example8 │   ├── cogstack │   │   └── observations.properties │   ├── db_dump │   │   └── db_samples.sql.gz │   ├── docker │   │   └── docker-compose.override.yml │   ├── gate │   │   └── app │   │   ├── active.lst │   │   ├── drug.gapp │   │   ├── drug.lst │   │   └── lists.def │   ├── extra │   │   └── clean_list.py │   └── setup.sh ├── example9 │   ├── cogstack │   │   └── observations.properties │   ├── db_dump │   │   └── db_samples.sql.gz │   ├── docker │   │   └── docker-compose.override.yml │   ├── gate │   │   └── app │   │   ├── active.lst │   │   ├── drug.gapp │   │   ├── drug.lst │   │   └── lists.def │   └── setup.sh ├── rawdata │   ├── mtsamples-txt-full.tgz │   ├── mtsamples-txt-small.tgz │   └── synsamples.tgz ├── download_db_dumps.sh ├── prepare_db_dumps.sh └── prepare_docs.sh



Info

Note: the contents of db_dump subdirectories for each example will be created after running download_db_dumps.sh script (please see below).







Common and reusable components

The directory docker-common contains some common components and microservice configuration files that are used within all the examples. These components include:

  • PostgreSQL databases: pgsamples and pgjobrepo directories,

  • ElasticSearch node: elasticsearch directory,

  • Kibana webservice dashboard: kibana directory,

  • nginx reverse proxy service: nginx directory,

  • Fluentd logging driver: fluentd directory,

  • Common microservices Docker Compose base configuration file used across examples: docker-compose.yml.

Examples

The directories example* stores the content of the examples, each containing such subdirectories:

  • cogstack directory containing CogStack configuration files and/or custom pipeline scripts,

  • db_dump directory containing database dumps used to initialize the samples input database,

  • docker directory containing configuration files for docker-based deployment,

  • extra directory containing scripts to generate database dumps locally (optional) or additional, useful materials,

  • setup.sh script to initialize the example before running it for the first time.

For a detailed description of each example please refer to its appropriate section.

Raw data

The directory rawdata contains the raw EHRs data which will be used to prepare the initial database dumps for running the examples. The datasets used in all the examples are the ones used in the Quickstart guide – please refer to it for a more detailed description of the available datasets.

Data preparation scripts

The script prepare_docs.sh is used to prepare the document data for Example 4 and Example 5 in PDFs, DOCX and image formats.

The script prepare_db_dumps.sh is used to prepare locally all the database dumps to initialize the examples

However, the script download_db_dumps.sh is used to automatically download all the database dumps.





General information about examples

Database schema

In the current implementation, CogStack can only ingest EHR data from a specified input database and relation. This is why, in order to process the sample patient data covered in this tutorial, one needs to create an appropriate database schema and load the EHR data into a preferred data sink. The data can be stored either as records in a database, documents in an ElasticSearch cluster or as files in a filesystem.

The most commonly used data sink is going to be ElasticSearch cluster (in our examples – just a single node). However, as relational join statements have a high performance burden for ElasticSearch, the EHR data is best to be stored denormalized in ElasticSearch. This is why, for the moment, we rely on ingesting the data from additional view(s) created in the sample database.

Properties file

Database source

When using PostgreSQL database as a data source, please note that the source.poolSize property defines the maximum size of the connection pool available for performing queries by CogStack engine. A PostgreSQL database, by default, has a maximum connection limit set to 100, hence exceeding the limit (either by a single job or multiple parallel ones) may lead to termination of the data pipeline.

One of the solutions to overcome this issue can be to run the PostgreSQL container with additional options specified in Docker-compose file:

command: "-c 'shared_buffers=256MB' -c 'max_connections=1000'"

extending the connection limit with the available RAM for connection buffers.

ElasticSearch sink

As an additional feature, security and ssl encryption can be enabled for communication with ElasticSearch. However, it uses the ElasticSearch X-Pack bundle and requires license for commercial deployments, hence it is disabled by default.

Moreover, the ElasticSearch and Kibana images that are used in the tutorial are not bundled with X-Pack, using the -oss variant of the image as defined in examples/docker-common/docker-compose.yml .

Partitioner

In the current implementation, CogStack engine can only partition the data using the records’ primary key (partitioner.pkColumnName property containing unique values) and records’ update time (partitioner.timeStampColumnName property) as defined in created views. Also, the table used to partition (partitioner.tableToPartition property) needs to be specified. This is specified by PKTimeStamp partitioning method as shown below, e.g.:

partitioner.partitionType = PKTimeStamp
partitioner.tableToPartition = <table-name> partitioner.pkColumnName = <primary-key-column-name>
partitioner.timeStampColumnName = <timestamp-column-name>





Running CogStack

Setup

For the ease of use CogStack is being deployed and run using Docker. However, before starting the CogStack ecosystem for the first time, a setup scripts needs to be run locally to prepare the Docker images and configuration files for CogStack data processing engine. For each of the examples, a script is available in its directory examples/example*/ path and can be run as:

bash setup.sh

As a result, a temporary directory __deploy/ will be created containing all the necessary artifacts to deploy CogStack.

Docker-based deployment

Next, we can proceed to deploy CogStack ecosystem using Docker Compose. It will configure and start microservices based on the provided Compose files:

  • common base configuration, copied from examples/docker-common/docker-compose.yml ,

  • example-specific configuration copied from examples/example*/docker/docker-compose.override.yml.

Moreover, the PostgreSQL database container comes with pre-initialized database dump ready to be loaded directly into.



In order to run CogStack, type in the examples/example*/__deploy/ directory:

docker-compose up

In the console there will be printed status logs of the currently running microservices. For the moment, however, they may be not very informative (we’re working on that).





Connecting to the microservices

CogStack ecosystem

The picture below sketches a general idea on how the microservices are running and communicating within a sample CogStack ecosystem.

Assuming that everything is working fine, we should be able to connect to the running microservices.

Kibana and ElasticSearch

Kibana dashboard used to query the EHRs can be accessed directly in browser via URL: http://localhost:5601/. The data can be queried using a number of ElasticSearch indices, e.g. sample_observations_view. Usually, each index will correspond to the database view in db_samples (samples-db PostgreSQL database) from which the data was ingested. However, when entering Kibana dashboard for the first time, an index pattern needs to be configured in the Kibana management panel – for more information about its creation, please refer to the official Kibana documentation.

In addition, ElasticSearch REST end-point can be accessed via URL http://localhost:9200/. It can be used to perform manual queries or to be used by other external services – for example, one can list the available indices:

curl 'http://localhost:9200/_cat/indices'

or query one of the available indices – sample_observations_view:

curl 'http://localhost:9200/sample_observations_view'

For more information about possible documents querying or modification operations, please refer to the official ElasticSearch documentation.

As a side note, the name for ElasticSearch node in the Docker Compose has been set as elasticsearch-1. The -1 ending emphasizes that for larger-scale deployments, multiple ElasticSearch nodes can be used – typically, a minimum of 3.

PostgreSQL sample database

Moreover, the access PostgreSQL database with the input sample data is exposed directly at localhost:5555. The database name is db_sample with user test and password test. To connect, one can run:

psql -U 'test' -W -d 'db_samples' -h localhost -p 5555





Example 1

General information

This is a very basic example covering only processing and ingestion of structured synthetic data into ElasticSearch. However, it forms a good base for starting to work with CogStack data processing pipelines.

Database schema

Patients table

The first 5 records of patient data (file: patients.csv from Synthea-based samples) in CSV format is presented below:

ID,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP b9f5a11b-211d-4ced-b3ba-12012c83b937,1939-08-04,1996-03-15,999-11-9633,S99999830,X106007X,Mr.,Brady,Lynch,,,M,white,polish,M,Worcester,701 Schiller Esplanade,Fitchburg,Massachusetts,01420 fab43860-c3be-4808-b7b4-00423c02816b,1962-06-21,2011-03-10,999-67-8307,S99958025,X26840237X,Mrs.,Antonia,Benavides,,Padrón,M,hispanic,mexican,F,Rockland,643 Hand Bay,Boston,Massachusetts,02108 84dd6378-2ddc-44b6-9292-2a4461bcef53,1998-12-01,,999-50-5147,S99987241,,Mr.,Keith,Conn,,,,white,english,M,Rockland,461 Spinka Extension Suite 69,Framingham,Massachusetts,01701 9929044f-1f43-4453-b2c0-a2f45dcdd4be,2014-09-23,,999-64-4171,,,,Derrick,Lakin,,,,white,irish,M,Tewksbury,577 Hessel Lane,Hampden,Massachusetts,

The patients table definition in PostgreSQL according to the specification:

CREATE TABLE patients ( id UUID PRIMARY KEY, birthdate DATE NOT NULL, deathdate DATE, ssn VARCHAR(64) NOT NULL, drivers VARCHAR(64), passport VARCHAR(64), prefix VARCHAR(8), first VARCHAR(64) NOT NULL, last VARCHAR(64) NOT NULL, suffix VARCHAR(8), maiden VARCHAR(64), marital CHAR(1), race VARCHAR(64) NOT NULL, ethnicity VARCHAR(64) NOT NULL, gender CHAR(1) NOT NULL, birthplace VARCHAR(64) NOT NULL, address VARCHAR(64) NOT NULL, city VARCHAR(64) NOT NULL, state VARCHAR(64) NOT NULL, zip VARCHAR(64) ) ;

Encounters table

Similarly, the first 5 records of patient encounters data (file: encounters.csv)

ID,START,STOP,PATIENT,CODE,DESCRIPTION,COST,REASONCODE,REASONDESCRIPTION 123ffd84-618e-47cd-abca-5fe95b72179a,1955-07-30T07:30Z,1955-07-30T07:45Z,b9f5a11b-211d-4ced-b3ba-12012c83b937,185345009,Encounter for symptom,129.16,36971009,Sinusitis (disorder) 66b016e9-e797-446a-8f2b-e5534acbbb04,1962-03-09T07:30Z,1962-03-09T07:45Z,b9f5a11b-211d-4ced-b3ba-12012c83b937,185349003,Encounter for check up (procedure),129.16,, d517437d-50b5-4cab-aca2-9c010c06989e,1983-08-19T07:30Z,1983-08-19T08:00Z,b9f5a11b-211d-4ced-b3ba-12012c83b937,185349003,Encounter for check up (procedure),129.16,, 2452cf09-021b-4586-9e33-59d7d2242f31,1987-08-28T07:30Z,1987-08-28T07:45Z,b9f5a11b-211d-4ced-b3ba-12012c83b937,185349003,Encounter for check up (procedure),129.16,, f25a828f-ae79-4dd0-b6eb-bca26138421b,1969-09-13T14:02Z,1969-09-13T14:34Z,fab43860-c3be-4808-b7b4-00423c02816b,185345009,Encounter for symptom,129.16,10509002,Acute bronchitis (disorder)

with the corresponding encounters table definition:

CREATE TABLE encounters ( id UUID PRIMARY KEY NOT NULL, start TIMESTAMP NOT NULL, stop TIMESTAMP, patient UUID REFERENCES patients, code VARCHAR(64) NOT NULL, description VARCHAR(256) NOT NULL, cost REAL NOT NULL, reasoncode VARCHAR(64), reasondescription VARCHAR(256), document TEXT --(*) ) ;

Observations table

The next table is observations, where the first 5 rows of observations.csv file are:

DATE,PATIENT,ENCOUNTER,CODE,DESCRIPTION,VALUE,UNITS,TYPE 1987-08-28,b9f5a11b-211d-4ced-b3ba-12012c83b937,2452cf09-021b-4586-9e33-59d7d2242f31,8302-2,Body Height,180.3,cm,numeric 2002-06-27,fab43860-c3be-4808-b7b4-00423c02816b,fd113dfd-5e2c-40f2-98f3-e153665c3f53,8302-2,Body Height,165.2,cm,numeric 2009-04-07,84dd6378-2ddc-44b6-9292-2a4461bcef53,2058da3b-f7f3-4ebc-8c1e-360cd256cdcb,8302-2,Body Height,138.3,cm,numeric 2002-06-27,fab43860-c3be-4808-b7b4-00423c02816b,fd113dfd-5e2c-40f2-98f3-e153665c3f53,29463-7,Body Weight,81.8,kg,numeric 2009-04-07,84dd6378-2ddc-44b6-9292-2a4461bcef53,2058da3b-f7f3-4ebc-8c1e-360cd256cdcb,29463-7,Body Weight,24.6,kg,numeric

and the corresponding table definition:

CREATE TABLE observations ( cid SERIAL PRIMARY KEY, --(*) created TIMESTAMP DEFAULT CURRENT_TIMESTAMP, --(*) date DATE NOT NULL, patient UUID REFERENCES patients, encounter UUID REFERENCES encounters, code VARCHAR(64) NOT NULL, description VARCHAR(256) NOT NULL, value VARCHAR(64) NOT NULL, units VARCHAR(64), type VARCHAR(64) NOT NULL ) ;

Here, with -- (*) have been marked additional fields with auto-generated values. These are: cid – an automatically generated primary key and created – a record's creation timestamp. They will be later used by CogStack engine for data partitioning when processing the records. The patient and encounters tables have their primary keys (id field) already defined (of UUID type) and are included in the input CSV files.

Database views

Next, we define a observations_view that will be used by CogStack data processing engine to ingest the records from input database:

CREATE VIEW observations_view AS SELECT p.id AS patient_id, p.birthdate AS patient_birth_date, p.deathdate AS patient_death_date, p.ssn AS patient_ssn, p.drivers AS patient_drivers, p.passport AS patient_passport, p.prefix AS patient_prefix, p.first AS patient_first_name, p.last AS patient_last_name, p.suffix AS patient_suffix, p.maiden AS patient_maiden, p.marital AS patient_marital, p.race AS patient_race, p.ethnicity AS patient_ethnicity, p.gender AS patient_gender, p.birthplace AS patient_birthplace, p.address AS patient_addr, p.city AS patient_city, p.state AS patient_state, p.zip AS patient_zip, enc.id AS encounter_id, enc.start AS encounter_start, enc.stop AS encounter_stop, enc.code AS encounter_code, enc.description AS encounter_desc, enc.cost AS encounter_cost, enc.reasoncode AS encounter_reason_code, enc.reasondescription AS encounter_reason_desc, enc.document AS encounter_document, obs.cid AS observation_id, --(*) obs.created AS observation_timestamp, --(*) obs.date AS observation_date, obs.code AS observation_code, obs.description AS observation_desc, obs.value AS observation_value, obs.units AS observation_units, obs.type AS observation_type FROM patients p, encounters enc, observations obs WHERE enc.patient = p.id AND obs.patient = p.id AND obs.encounter = enc.id ;

The goal here is to denormalize the database schema for CogStack and ElasticSearch data ingestion, as the observations table is referencing both the patient and encounters tables by their primary key. In the current implementation, CogStack pipeline engine cannot yet perform dynamic joins over the relational data from specific database tables.

Some of the crucial fields required for configuring CogStack Pipeline engine with Document data model have been marked with --(*) – these are:

  • observation_id – the unique identifier of the observation record (typically, the primary key),

  • observation_timestamp – the record creation or last update time.

These fields will be later used when preparing the properties configuration file for CogStack data processing workflow.

Properties file

General information

Each CogStack data processing pipeline is configured using a number of parameters defined in the corresponding Java properties file. In this example we use only one pipeline with configuration specified in examples/example1/cogstack/observations.properties file.

Spring profiles

CogStack configuration file uses a number of Spring profiles, which enable different components of the data processing pipeline. In this example we have:

spring.profiles.active = jdbc_in,elasticsearchRest,localPartitioning

which denotes that only such profiles will be active:

  • jdbc_in for JDBC input database connector,

  • elasticsearchRest for using REST API for inserting documents to ElasticSearch,

  • localPartitioning functionality.

Data source

The parameters for specifying the data source are defined as follows:

source.JdbcPath = jdbc:postgresql://samples-db:5432/db_samples source.Driver = org.postgresql.Driver source.username = test source.password = test

In this example we are using a PostgreSQL database which driver is defined by source.Driver parameter. The PostgreSQL database service is available in the CogStack ecosystem as samples-db, has exposed port 5432 for connections and the sample database name is db_samples – all these details need to be included in the source.JdbcPath parameter field.

Next, we need to instruct CogStack engine how to query the records from the data source:source.selectClause = SELECT *

source.fromClause = FROM observations_view source.sortKey = observations_id

source.primaryKeyFieldValue = observations_id
source.timeStamp = observations_timestamp

source.dbmsToJavaSqlTimestampType = TIMESTAMP



This is where the previously defined observations_view with additional CogStack-specific fields are used.

Data sink

Next, we need to define the data sink – in our example, and by default, ElasticSearch is being used:

elasticsearch.cluster.host = elasticsearch-1 elasticsearch.cluster.port = 9200

Similarly, as when defining the sample database source, we need to provide the ElasticSearch host and port configuration according to the microservices definition in the corresponding Docker Compose file (see examples/example1/docker/docker-compose.override.yml).

In the next step, we specify some of the optional ElasticSearch indexing parameters:

elasticsearch.index.name = sample_observations_view elasticsearch.excludeFromIndexing = observations_id

We specify the index name which will be used to store the documents processed by CogStack engine. Additionally, we specify which fields should be excluded from the indexing – by default, we usually exclude the binary content, the constant-value fields and the primary key from the observations_view.

Jobs and CogStack engine configuration

CogStack engine in order to coordinate the workers needs to keep the information about the current jobs in an additional PostgreSQL database – cogstack-job-repo. Hence, similarly as when defining the source database, this database needs to specified:

jobRepository.JdbcPath = jdbc:postgresql://cogstack-job-repo:5432/cogstack jobRepository.Driver = org.postgresql.Driver jobRepository.username = cogstack jobRepository.password = mysecretpassword job.jobName = job_observations_view

The last parameter job.jobName is a default name for the jobs that will be created.

Partitioner and scheduler

Another set of useful parameters are related with controlling the job execution and data partitioning:

partitioner.partitionType = PKTimeStamp
partitioner.tableToPartition = observations_view partitioner.pkColumnName = observations_id partitioner.timeStampColumnName = observations_timestamp

Apart from data partitioning, although optional, it can be useful to set up the scheduler – the following line corresponds to the scheduler configuration:

scheduler.useScheduling = false

In this example we do not use the scheduler, since we ingest EHRs from the data source only once. However, in case when data is being generated in a continuous way, scheduler should be enabled to periodically run CogStack jobs to process the new EHRs. It is disabled by default.

Deployment information

This example uses the standard stack of microservices as presented in Running CogStack based on the Docker Compose files: examples/docker-common/docker-compose.yml and  examples/example1/docker/docker-compose.override.yml.

The current example also uses a single CogStack properties file (see examples/example1/cogstack/observations.properties) and hence runs only one instance of CogStack data processing engine.





Example 2

General information

This example is an extension of Example 1. Apart from containing structured synthetic data, it also contains free-text documents data, hence creating a combined dataset.

This example is also covered as a main part of CogStack Quickstart tutorial.

Database schema

The database schema is almost the same as the one defined in Example 1. The only difference is an additional column in encounters table, as presented below.

Encounters table

The encouters table definition:

CREATE TABLE encounters ( id UUID PRIMARY KEY NOT NULL, start TIMESTAMP NOT NULL, stop TIMESTAMP, patient UUID REFERENCES patients, code VARCHAR(64) NOT NULL, description VARCHAR(256) NOT NULL, cost REAL NOT NULL, reasoncode VARCHAR(64), reasondescription VARCHAR(256), document TEXT --(*) ) ;

Here, with -- (*) has been marked an additional document column field of TEXT type. This extra field will be used to store the content of a document from MTSamples dataset.

Just to clarify, Synthea-based and MTSamples datasets are two unrelated datasets. Here, we are extending the synthetic dataset with the clinical documents from the MTSamples to create a combined one, to be able to perform a bit more interesting queries.

A sample document from MTSamples dataset is presented below:

Sample Type / Medical Specialty: Allergy / Immunology Sample Name: Allergic Rhinitis Description: A 23-year-old white female presents with complaint of allergies. (Medical Transcription Sample Report) SUBJECTIVE: This 23-year-old white female presents with complaint of allergies. She used to have allergies when she lived in Seattle but she thinks they are worse here. In the past, she has tried Claritin, and Zyrtec. Both worked for short time but then seemed to lose effectiveness. She has used Allegra also. She used that last summer and she began using it again two weeks ago. It does not appear to be working very well. She has used over-the-counter sprays but no prescription nasal sprays. She does have asthma but doest not require daily medication for this and does not think it is flaring up. MEDICATIONS: Her only medication currently is Ortho Tri-Cyclen and the Allegra. ALLERGIES: She has no known medicine allergies. OBJECTIVE: Vitals: Weight was 130 pounds and blood pressure 124/78. HEENT: Her throat was mildly erythematous without exudate. Nasal mucosa was erythematous and swollen. Only clear drainage was seen. TMs were clear. Neck: Supple without adenopathy. Lungs: Clear. ASSESSMENT: Allergic rhinitis. PLAN: 1. She will try Zyrtec instead of Allegra again. Another option will be to use loratadine. She does not think she has prescription coverage so that might be cheaper. 2. Samples of Nasonex two sprays in each nostril given for three weeks. A prescription was written as well. Keywords: allergy / immunology, allergic rhinitis, allergies, asthma, nasal sprays, rhinitis, nasal, erythematous, allegra, sprays, allergic,

Database views

Analogously, the new document column field is included in the observations_view, where the view is based on the one defined in Example 1.

Properties file

The properties file used in this example is the same as in Example 1.

Deployment information

Similarly as in Example 1, this one uses the standard stack of microservices defined in examples/example2/docker/docker-compose.override.yml.

It also uses a single CogStack properties file and hence runs only one instance of CogStack data processing engine (see: examples/example2/cogstack/observations.properties).





Example 3

General information

This example covers a case of multiple data sources and multiple CogStack instances scenario. This example is a further extension of both Example 1 and Example 2. It extends Example 1 by defining schema for all the tables for Synthea-based patient data. It also extends Example 2 by defining a separate table for representing and storing MTSamples data.

Database schema

The database schema is based on the one defined in Example 1 where the same definition logic has been applied to the rest of CSV files available in the synthetic dataset. The complete database schema for the synthetic data is available in examples/example3/extra/db_create_syn_schema.sql file.

The only new table is the one for representing MTSamples data defined in examples/example3/extra/db_create_mt_schema.sql.

Samples table and view

The definition of mtsamples table and its corresponding view:

CREATE TABLE mtsamples ( cid SERIAL PRIMARY KEY, --(*) sample_id INTEGER NOT NULL, type VARCHAR(256) NOT NULL, type_id INTEGER NOT NULL, name VARCHAR(256) NOT NULL, description TEXT NOT NULL, document TEXT NOT NULL, dct TIMESTAMP DEFAULT CURRENT_TIMESTAMP -- (*) ) ;

In contrast to MTSamples data representation used in Example 2 (where the full content of a document was stored in the document field in the encounters table), in this example we partially parse the document, hence improving the data representation.

Two additional fields have been added to connect with CogStack Document model:

  • cid – automatically generated unique id,

  • dct – a document creation timestamp.

Please refer to examples/example3/extra/prepare_synsamples_db.sh on how the synthetic data is parsed and examples/example3/extra/prepare_synsamples_db.sh on how the MTSamples data is parsed.

Properties files

The properties files used in this example are based on the one from Example 1. However, since multiple views are defined, the properties files can be automatically generated based on a provided template.properties file by running in examples/example3/cogstack/ directory:

bash gen_config.sh

This way, each generated properties file corresponds to an individual view as defined in the database schema.

Apart from that, a separate mt.properties file is provided for processing MTSamples data as defined in samples_view.

Deployment information

This example uses the standard stack of microservices (see: Running CogStack), but extended with additional database storing input sample data. It uses 2 separate input databases as the data source: samples-db and mtsamples-db – see: examples/example3/docker/docker-compose.override.yml.

It also uses multiple CogStack properties files, hence multiple instances of CogStack data processing engine are run, one per each properties file.





Example 4

General information

This example covers a common use-case of processing and parsing document data that is stored alongside records in the database. This example is based on Example 2 and extends the CogStack data processing workflow by including Apache Tika module as one of the core components used for processing documents.

Data preparation

Apart from structured records data in text format, this example uses documents which are loaded directly into the input database. The script for generating documents prepare_docs.sh is provided in the examples main directory.

At the moment, the following use-cases have been prepared with documents generated in the following formats:

  • docx – documents in DOCX format of text type,

  • pdf-text – documents in PDF format of text type,

  • pdf-img – documents in PDF format of image type,

  • jpg – documents in JPEG format.

Each use-case can be seen as a standalone example, but using the same database schema, CogStack properties file and docker-compose file. Hence, when generating docker-based deployment data (by running setup.sh script), a separate directory will be generated.

Database schema

The database schema is based on the one defined in Example 2. Only some minor modifications were made in encounters table and observations_view – see: examples/example4/extra/db_create_schema.sql file.

Encounters table

CREATE TABLE encounters ( cid SERIAL, --(*) id UUID PRIMARY KEY NOT NULL, start TIMESTAMP NOT NULL, stop TIMESTAMP, patient UUID REFERENCES patients, code VARCHAR(64) NOT NULL, description VARCHAR(256) NOT NULL, cost REAL NOT NULL, reasoncode VARCHAR(64), reasondescription VARCHAR(256), binarydocument BYTEA --(*) ) ;

In this example, the document is stored in column binarydocument of BYTEA type – instead of DOCUMENT as raw TEXT defined in Example 2.

Observations view

CREATE VIEW observations_view AS SELECT p.id AS patient_id, -- ... enc.binarydocument AS encounter_binary_doc, --(*) obs.cid AS observation_id, --(*) obs.created AS observation_timestamp, --(*) -- ... FROM patients p, encounters enc, observations obs WHERE enc.patient = p.id AND obs.patient = p.id AND obs.encounter = enc.id ;

The important field added in this view is encounter_binary_doc which will be used to read the binary document by CogStack engine.

Properties file

The properties file used in this example is based on the one from Example 2, but extended with parts covering Tika documents processor and which are covered below.

Spring profiles

The spring profile part has been updated with adding a tika profile:

spring.profiles.active=jdbc_in,elasticsearchRest,tika,localPartitioning

Tika configuration

A new part covering Tika processing has been added:

tika.tikaFieldName = tika_output tika.binaryContentSource = database tika.binaryFieldName = encounter_binary_doc

The property tika.tikaFieldName denotes the name of the key field tika_output. This field will be present in the output JSON file where the value will hold the content of the Tika-parsed document.

The property tika.binaryContentSource defines the source where the documents are stored – in our case: database. Following, the property tika.binaryFieldName denotes the name of column that contains binary document data – in our case that is encounter_binary_doc field in observations_view view.

It’s important to note that the remaining information about mapping and querying of the source database tables and record fields are covered by source.* properties, as explained in Example 1.

Deployment information

When running setup.sh script, a number of separate directories will be created, each corresponding to a document format use-case.

Apart from that, this example uses the standard stack of microservices (see: Running CogStack) and also uses a single CogStack properties file, running only one instance of CogStack data processing engine.





Example 5

General information

This example covers a bit more complex use-case of processing and parsing EHR data where the documents are stored alongside records in the same database. This example is based both on Example 3 and Example 4, with the difference that only a single job is being run and the data processing workflow is divided into two steps:

  1. Pre-processing and parsing of the documents data,

  2. Processing all the records.

Usually, we would like to perform the steps (1) and (2) in the same workflow as presented in Example 4. However, there may be cases that we may prefer to pre-process the data and store it temporarily prior to ingestion into ElasticSeach (or other data sink). This may be a solution for cases where one would like to have more control on the documents parsing process, e.g., to possibly easily re-launch parsing of the selected or failed documents.

Database schema

The database schema is based on the one from Example 3 with some minor modifications and with additional tables introduced.

Encounters table

In the encounters table a representation of the document data has been altered:

CREATE TABLE encounters ( cid SERIAL NOT NULL, id UUID PRIMARY KEY, start TIMESTAMP NOT NULL, stop TIMESTAMP, patient UUID REFERENCES patients, code VARCHAR(64) NOT NULL, description VARCHAR(256) NOT NULL, cost REAL NOT NULL, reasoncode VARCHAR(64), reasondescription VARCHAR(256), documentid INTEGER --(*) ) ;