Anchor | ||||
---|---|---|---|---|
|
...
Example 1 – Processing a simple, structured dataset from a single DB source.
Example 2 – Processing a combined structured and free-text dataset from a single DB source (as in Quickstart).
Example 3 – Processing a combined dataset from multiple DB sources, multiple jobs.
Example 4 – Processing a combined dataset with embedded documents from a single DB source.
Example 5 – 2-step processing of a combined dataset with embedded documents from a single DB source.
Example 6 – Basic security use-case: Example 2 extended reverse proxy enabling a secure access.
Example 7 – Logging: Example 6 extended with logging mechanisms.
Example 8 – Simple NLP use-case: drug annotation using GATE and based on Example 2.
Example 9 – Defining multi-component pipelines: Example 4 and Example 8 combined.
Example 10 – Ingesting free-text data from DB source to ES, annotating with MedCAT and re-ingesting back to ES.
Sample production deployment - structured project setup of files to be used for production deployments
The main directory with resources used in this tutorial is available in the the CogStack bundle under examples
directory.
...
One of the solutions to overcome this issue can be to run the PostgreSQL container with additional options specified in Docker-compose file:
command: "-c 'shared_buffers=256MB' -c 'max_connections=1000'"
extending the connection limit with the available RAM for connection buffers.
...
partitioner.partitionType = PKTimeStamp
partitioner.tableToPartition = <table-name> partitioner.pkColumnName = <primary-key-column-name>
partitioner.timeStampColumnName = <timestamp-column-name>
Anchor | ||||
---|---|---|---|---|
|
...
In addition, ElasticSearch REST end-point can be accessed via URL http://localhost:9200/
. It can be used to perform manual queries or to be used by other external services – for example, one can list the available indices:
curl 'http://localhost:9200/_cat/indices'
or query one of the available indices – sample_observations_view
:
curl 'http://localhost:9200/sample_observations_view'
For more information about possible documents querying or modification operations, please refer to the official ElasticSearch documentation.
As a side note, the name for ElasticSearch node in the Docker Compose has been set as elasticsearch-1
. The -1
ending emphasizes that for larger-scale deployments, multiple ElasticSearch nodes can be used – typically, a minimum of 3.
PostgreSQL sample database
Moreover, the access PostgreSQL database with the input sample data is exposed directly at localhost:5555
. The database name is db_sample
with user test and password test. To connect, one can run:
psql -U 'test' -W -d 'db_samples' -h localhost -p 5555
Anchor | ||||
---|---|---|---|---|
|
...
The parameters for specifying the data source are defined as follows:
source.JdbcPath = jdbc:postgresql://samples-db:5432/db_samples source.Driver = org.postgresql.Driver source.username = test source.password = test
In this example we are using a PostgreSQL database which driver is defined by source.Driver
parameter. The PostgreSQL database service is available in the CogStack ecosystem as samples-db
, has exposed port 5432
for connections and the sample database name is db_samples
– all these details need to be included in the source.JdbcPath
parameter field.
Next, we need to instruct CogStack engine how to query the records from the data source:source.selectClause = SELECT *
source.fromClause = FROM observations_view source.sortKey = observations_id
source.primaryKeyFieldValue = observations_id
source.timeStamp = observations_timestamp
source.dbmsToJavaSqlTimestampType = TIMESTAMP
This is where the previously defined observations_view
with additional CogStack-specific fields are used.
...
elasticsearch.cluster.host = elasticsearch-1 elasticsearch.cluster.port = 9200
Similarly, as when defining the sample database source, we need to provide the ElasticSearch host and port configuration according to the microservices definition in the corresponding Docker Compose file (see examples/example1/docker/docker-compose.override.yml
).
...
elasticsearch.index.name = sample_observations_view elasticsearch.excludeFromIndexing = observations_id
We specify the index name which will be used to store the documents processed by CogStack engine. Additionally, we specify which fields should be excluded from the indexing – by default, we usually exclude the binary content, the constant-value fields and the primary key from the observations_view
.
...
CogStack engine in order to coordinate the workers needs to keep the information about the current jobs in an additional PostgreSQL database – cogstack-job-repo
. Hence, similarly as when defining the source database, this database needs to specified:
jobRepository.JdbcPath = jdbc:postgresql://cogstack-job-repo:5432/cogstack jobRepository.Driver = org.postgresql.Driver jobRepository.username = cogstack jobRepository.password = mysecretpassword job.jobName = job_observations_view
The last parameter job.jobName
is a default name for the jobs that will be created.
...
partitioner.partitionType = PKTimeStamp
partitioner.tableToPartition = observations_view partitioner.pkColumnName = observations_id partitioner.timeStampColumnName = observations_timestamp
Apart from data partitioning, although optional, it can be useful to set up the scheduler – the following line corresponds to the scheduler configuration:
scheduler.useScheduling = false
In this example we do not use the scheduler, since we ingest EHRs from the data source only once. However, in case when data is being generated in a continuous way, scheduler should be enabled to periodically run CogStack jobs to process the new EHRs. It is disabled by default.
...
The spring profile part has been updated with adding a tika
profile:
spring.profiles.active=jdbc_in,elasticsearchRest,tika,localPartitioning
Tika configuration
A new part covering Tika processing has been added:
tika.tikaFieldName = tika_output tika.binaryContentSource = database tika.binaryFieldName = encounter_binary_doc
The property tika.tikaFieldName
denotes the name of the key field tika_output
. This field will be present in the output JSON file where the value will hold the content of the Tika-parsed document.
...
The spring profiles used in this step are:
spring.profiles.active=jdbc_in,jdbc_out,tika,localPartitioning
In general, this tells us that the documents will be read from an input database (profile: jdbc_in
), processed using tika
with localPartitioning
scheme and stored in an output database (profile: jdbc_out
).
...
The source and target database are specified as follows (in fact, it's the same database):
source.JdbcPath = jdbc:postgresql://samples-db:5432/db_samples source.Driver = org.postgresql.Driver source.username = test source.password = test target.JdbcPath = jdbc:postgresql://samples-db:5432/db_samples target.Driver = org.postgresql.Driver target.username = test target.password = test
The data source and target binding for CogStack engine is defined as follows:
source.primaryKeyFieldValue = cid source.timeStamp = dct source.selectClause = SELECT * source.fromClause = FROM medical_reports source.sortKey = cid
target.Sql = INSERT INTO medical_reports_processed (cid, dct, output) VALUES ( CAST( :primaryKeyFieldValue AS integer ), :timeStamp, :outputData )
In this first data processing step we are going to read the data from medical_reports
– as provided for source.fromClause
.
...
Tika configuration
tika.binaryFieldName = binarydoc tika.tikaFieldName = tika_output tika.binaryContentSource = database
The property tika.tikaFieldName
denotes the name of the key field tika_output
in the output JSON file where the value will contain the content of the Tika-parsed document. See, e.g., the reports_processed_view
where the content of tika_output
is accessed and parsed.
...
Regarding Docker Compose configuration file, for each microservice used an additional section has been added regarding logging – e.g., in case of CogStack pipeline:
cogstack-pipeline: image: cogstacksystems/cogstack-pipeline:latest ... logging: driver: "fluentd" options: tag: cog.java.engine
"fluentd"
is used as the logging driver
. All the messages from the cogstack
microservice will be forwarded to the fluentd driver using cog.java.engine
as tag
. The directory with the output logs from fluentd running container will be mapped to a local path in the deployment directory: examples/example7/__deploy/__logs
. For the full configuration of running microservices, please refer to examples/example7/docker/docker-compose.override.yml
.
...
To parse the logs, one easy way is to use jq – a flexible JSON command-line processor. For example, to parse the log
message field, one may use:
jq ".log" example7/__deploy/__logs/<filename>.log
...
The spring profile part has been updated with adding a gate
profile:
spring.profiles.active=jdbc_in,elasticsearchRest,gate,localPartitioning
GATE configuration
A new part covering documents processing using a custom GATE application has been added:
gate.gateHome = /gate/home/ gate.gateApp = /gate/app/drug.gapp gate.fieldsToGate = encounter_document gate.gateAnnotationTypes = Drug gate.gateFieldName = gate
The property gate.gateHome
denotes the home directory of GATE application, which should be the same for all GATE applications when using CogStack GATE image from Dockerhub (please see below). gate.gateApp
denotes the name of the GATE application to be run – in our example, the application directory (containing the gapp application with resources) will be directly mounted into CogStack container into /gate/app/
directory.
...
The properties file used in this example is based on both Example 4 and Example 8. The Spring active profiles used are both tika
and gate
. However, the most important bit is to specify how the document text is passed from Tika processor to GATE document processor:
## TIKA CONFIGURATION ## #... tika.binaryFieldName = encounter_binary_doc tika.tikaFieldName = tika_output #... ##### GATE CONFIGURATION ## # ... gate.fieldsToGate = tika_output gate.gateFieldName = gate # ...
Tika item processor will extract the text from the document initially stored in binary form in encounter_binary_doc
field (property: tika.binaryFieldName
; see Example 4 for the DB schema). Then, it will store the extracted text in a tika_output
field (property: tika.tikaFieldName
) in the Document model. The GATE application will then read the text from tika_output
field (property: gate.fieldsToGate
), process it and store the extracted annotations in gate
field (property: gate.gateFieldName
) in the Document model. At the end of processing of the record, a resulting JSON with all the available fields will be generated and send to ElasticSearch.
...