This is the multi-page printable view of this section. Click here to print.
Components
- 1: Document
- 2: Events
- 3: Publisher
- 4: Runner
- 5: Worker
- 6: Config
- 7: Indexer
- 8: Stages
- 8.1: EmbeddedPython
- 8.2: ExternalPython
- 8.3: PromptOllama
- 8.4: QueryOpensearch
- 9: File Handlers
- 10: Connectors
- 10.1: RSS Connector
- 10.2: File Connector
- 11: Pipeline
1 - Document
A Lucille Document is the basic unit of data that gets sent through a pipeline and indexed.
Documents are set of named fields which may contain single or list values represented in JSON. Each Document has a unique id.
2 - Events
Lucille Events
As a document passes through various stages of the Lucille ETL pipeline, and as the documents are handled by the indexer, Event messages are generated.
Connectors listen for these events to ensure that all of the documents sent for processing are successfully processed and accounted for.
Errors during processing are reported and tracked via these event messages so the connector can report back the overall success or failure of the execution.
Event Topics
A Kafka Topic that contains event messages. The event messages are sent from stages in the Lucille Pipeline as well as from the indexer.
The event topic name is based on a run ID which is stamped on documents that are flowing through the system. Whenever events are enabled, documents need to have a run ID on them.
In the case where there is no runner to create the run ID and no Lucille publisher to stamp that run ID on the documents, the “third party publisher” that is putting document json onto Kafka would need to include a run ID on those documents. It could choose its own run ID to use.
3 - Publisher
Publisher provides a way to publish documents for processing by the pipeline. When published, a Lucille document becomes available for consumption by any active pipeline Worker.
The Publisher is aware of every document in the run that needs to be indexed, and determines when to end a run by reading all the specific document events.
Publisher also:
- Is responsible for stamping a designated
run_idon each published document and maintaining accounting details specific to a run. - Accepts incoming events related to published documents and their children (via in-memory queue or Kafka Event Topic).
4 - Runner
When you run Lucille at the command line in standalone mode, you are invoking the Runner. When invoked, the runner reads the configuration file and then begins a Lucille run by launching the appropriate connector(s) and publisher. The runner generates a runId per Lucille run and terminates based on messages sent by the Publisher.
What the runner invokes can be thought of is an end to end Lucille Run.
A Lucille Run is a sequence of connectors to be run, one after the other. Each connectors feeds to a specific pipeline. A run can include multiple connectors feeding multiple pipelines.
5 - Worker
6 - Config
Lucille Configuration
When you run Lucille, you provide a path to a file which provides configuration for your run. Configuration (Config) files use HOCON, a superset of JSON. This file defines all the components in your Lucille run.
Quick references
- Example (local single-process): application-example.conf
- Example (S3 OpenSearch): s3-opensearch.conf
- HOCON / Typesafe Config docs: lightbend/config
A complete config file must contain three elements (Connector(s), Pipeline(s), Indexer):
Connectors
Connectors read data from a source and emit it as a sequence of individual Documents, which will then be sent to a Pipeline for enrichment.
connectors should be populated with a list of Connector configurations.
See Connectors for more information about configuring Connectors.
Pipeline and Stages
A pipeline is a list of Stages that will be applied to incoming Documents, preparing them for indexing. As each Connector executes, the Documents it publishes can be processed by a Pipeline, made up of Stages.
pipelines should be populated with a list of Pipeline configurations. Each Pipeline needs two values: name,
the name of the Pipeline, and stages, a list of the Stages to use. Multiple connectors may feed to the same Pipeline.
See Stages for more information about configuring Stages.
Indexer
An indexer sends processed Documents to a specific destination. Only one Indexer can be defined; all pipelines will feed to the same Indexer.
A full indexer configuration has two separate config blocks: first, the generic indexer configuration, and second, configuration for the specific indexer
used in your run. For example, to use the SolrIndexer, you provide separate indexer and solr config blocks.
See Indexers for more information about configuring your Indexer.
Other Run Configuration
In addition to those three elements, you can also configure other parts of a Lucille run.
publisher- Define thequeueCapacity.logrunnerzookeeperkafka- Provide aconsumerPropertyFile,producerPropertyFile,adminPropertyFile, and other configuration.worker- Control how manythreadsyou want, themaxRetriesin Zookeeper, and more.
Validation
Lucille validates the Config you provide for Connectors, Stages, and Indexers. For example, in a Stage, if you provide a property the Stage does not use, an Exception will be thrown. An Exception will also be thrown if you do not provide a property required by the Stage.
If you want to validate your config file without starting an actual run, you can use our command-line validation tool. Just add
-validate to the end of your command executing Lucille. The errors with your config will be printed out to the console, and
no actual run will take place.
Config Validation
Lucille components (like Stages, Indexers, and Connectors) each take in a set of specific arguments to configure the component correctly.
Sometimes, certain properties are required - like the pathsToStorage for your FileConnector traversal, or the path for your
CSVIndexer. Other properties are optional / do not always have to be specified.
For these components, developers must declare a Spec which defines the properties that are required or optional. They must
also declare what type each property is (number, boolean, string, etc.). For example, the SequenceConnector requires you
to specify the numDocs you want to create, and optionally, the number you want IDs to startWith. So, the Spec looks like this:
public static final Spec SPEC = SpecBuilder.connector()
.requiredNumber("numDocs")
.optionalNumber("startWith")
.build();
Declaring a Spec
Lucille is designed to access Specs reflectively. If you build a Stage/Indexer/Connector/File Handler, you need to declare a public static Spec
named SPEC (exactly). Failure to do so will not result in a compile-time error. However, you will not be able
to instantiate your component - even in unit tests - as the reflective access (which takes place in the super / abstract class) will always fail.
When you declare the public static Spec SPEC, you’ll want to call the appropriate SpecBuilder method which provides appropriate
default arguments for your component. For example, if you are building a Stage, you should call SpecBuilder.stage(), which allows
the config to include name, class, conditions, and conditionPolicy.
Lists and Objects
Validating a list / object is a bit tricky. When you declare a required / optional list or object in a Config, you can either provide:
- A
TypeReferencedescribing what the unwrapped List/Object should deserialize/cast to. - A
Spec, for a list, or a named Spec (created via SpecBuilder.parent()), for an object, describing the valid properties. (Use aSpecfor a list when you need aList<Config>with specific structure. For example,Stageconditions.)
Parent / Child Validation
Some configs include properties which are objects, containing even more properties. For example, in the FileConnector, you
can specify fileOptions - which includes a variety of additional arguments, like getFileContent, handleArchivedFiles, and more.
This is defined in a parent Spec, created via SpecBuilder.parent(), which has a name (the key the config is held under) and has its own required/optional properties.
The fileOptions parent Spec is:
SpecBuilder.parent("fileOptions")
.optionalBoolean("getFileContent", "handleArchivedFiles", "handleCompressedFiles")
.optionalString("moveToAfterProcessing", "moveToErrorFolder").build();
A parent Spec can be either required or optional. When the parent is present, its properties will be validated against this parent Spec.
There will be times that you can’t know what the field names would be in advance. For example, a field mapping of some kind.
In this case, you should pass in a TypeReference describing what type the unwrapped ConfigObject should deserialize/cast to.
For example, if you want a field mapping of Strings to Strings, you’d pass in new TypeReference<Map<String, String>>(){}.
In general, you should use a Spec when you know field names, and a TypeReference when you don’t.
Why Validate?
Obviously, you will get an error if you call config.getString("field") when "field" is not specified. However, additional validation
on Configs is still useful/necessary for two primary reasons:
- Command-line utility
We want to allow the command-line validation utility to provide a comprehensive list of a Config’s errors. As such, Lucille has to
validate the config before a Stage/Connector/Indexer begins accessing properties and potentially throwing a ConfigException.
- Prevent typos from ruining your pipeline
A mistyped field name could have massive ripple effects throughout your pipeline. As such, each Stage/Connector/Indexer needs to have a specific set of legal Config properties, so Exceptions can be raised for unknown or unrecognized properties.
7 - Indexer
Indexers
An Indexer is a thread that retrieves processed Documents from the end of a Pipeline and sends them in batches to a specific destination. For users of Lucille, this destination will most commonly be a search engine.
Only one Indexer can be defined in a Lucille run. All pipelines will feed to the same Indexer.
Indexer configuration has two parts:
the generic
indexerconfigurationconfiguration for the implementation you are using.
For example, if you are using Solr, you’d provide
solrconfig, orelasticfor Elasticsearch,csvfor CSV, etc.
Here’s what using the SolrIndexer might look like:
# Generic indexer config
indexer {
type: "solr"
ignoreFields: ["city_temp"]
batchSize: 100
}
# Specific implementation (Solr) config
solr {
useCloudClient: true
url: "localhost:9200"
defaultCollection: "test_index"
}
At a minimum, indexer must contain either type or class. type is shorthand for an indexer provided by lucille-core -
it can be "Solr", "OpenSearch", "ElasticSearch", or "CSV". indexer can contain a variety of additional properties as well.
Some Indexers do not support certain properties, however. For example, OpenSearchIndexer and ElasticsearchIndexer do not support
indexer.indexOverrideField.
The lucille-core module contains a number of commonly used indexers. Additional indexers with a large number of dependencies are provided as optional plugin modules.
Lucille Indexers (Core)
Lucille Indexers (Plugins)
8 - Stages
Lucille Stages
Stages are the building blocks of a Lucille pipeline. Each Stage performs a specific transformation on a Document.
Lucille Stages should have JavaDocs that describe their purpose and the parameters acceptable in their Config. On this site, you’ll find more in-depth documentation for some more advanced / complex Lucille Stages.
To configure a stage, you have to provide its class (under class) in its config. You can also specify a name for the Stage as well,
in addition to conditions and conditionPolicy (described below).
You’ll also provide the parameters needed by the Stage as well. For example, the AddRandomBoolean Stage accepts two optional parameters -
field_name and percent_true. So, an AddRandomBoolean Config would look something like this:
{
name: "AddRandomBoolean-First"
class: "com.kmwllc.lucille.stage.AddRandomBoolean"
field_name: "rand_bool_1"
percent_true: 65
}
Conditions
For any Stage, you can specify “conditions” in its Config, controlling when the Stage will process a Document. Each
condition has a required parameter, fields, and two optional parameters, operator and values.
fieldsis a list of field names that will determine whether the Stage applies to a Document.valuesis a list of values that the conditional fields will be searched for. (If not specified, only the existence of fields is checked.)operatoris either"must"or"must_not"(defaults to"must").
In the root of the Stage’s Config, you can also specify a conditionPolicy - either "any" or "all", specifying whether
any or all of your conditions must be met for the Stage to process a Document. (Defaults to "any".)
Let’s say we are running the Print Stage, but we only want it to execute on a Document where city = Boston or city = New York.
Our Config for this Stage would look something like this:
{
name: "print-1"
class: "com.kmwllc.lucille.stage.Print"
conditions: [
{
fields: ["city"]
values: ["Boston", "New York"]
}
]
}
8.1 - EmbeddedPython
Why Use It?
EmbeddedPython executes per-document Python code inside the Lucille JVM using GraalPy. Instead of returning a JSON object, your script mutates the current document directly through a Python-friendly proxy bound as doc (and the raw Java document as rawDoc). This avoids ports, subprocesses, venvs, and per-document JSON round trips.
When To Use It
Use EmbeddedPython when you need one or more of the following:
- Minimal operational overhead (ports, subprocess lifecycle, venv creation, pip installs).
- No use of any external Python libraries or native dependencies that require a real Python environment.
- Lightweight field enrichment/transformation.
When To Use ExternalPython Instead
Avoid EmbeddedPython and use ExternalPython when you need one or more of the following:
- Real Python compatibility (including packages with native dependencies).
- Dependency management via a requirements.txt installed into a managed venv.
- Process isolation apart from the JVM.
Example
Input Document
{
"id": "doc-1",
"title": "Hello",
"author": "Test",
"views": 123
}
Python Script
doc["title"] = doc["title"].upper()
Output Document
{
"id": "doc-1",
"title": "HELLO",
"author": "Test",
"views": 123
}
Config Parameters
{
name: "EmbeddedPython-Example"
class: "com.kmwllc.lucille.stage.EmbeddedPython"
# Specify exactly one of the following:
script_path: "/path/to/my_script.py"
script: "doc['title'] = doc['title'].upper()"
}
8.2 - ExternalPython
Why Use It?
ExternalPython delegates per-document processing to an external Python process using Py4J. Lucille serializes the Document into a request, calls a Python function, receives a JSON response, and applies that response back onto the document.
When To Use It
Use ExternalPython when you need one or more of the following:
- Real Python compatibility (including packages with native dependencies).
- Dependency management via a requirements.txt installed into a managed venv.
- Process isolation apart from the JVM.
When To Use EmbeddedPython Instead
Avoid ExternalPython and use EmbeddedPython when you need one or more of the following:
- Minimal operational overhead (ports, subprocess lifecycle, venv creation, pip installs).
- No use of any external Python libraries or native dependencies that require a real Python environment.
- Lightweight field enrichment/transformation.
Restrictions
Your python file must be in one of the following directories that start in the current working directory that is running lucille:
./python./src/main/resources./src/test/resources./src/test/resources/ExternalPythonTest(for testing)
Example
Input Document
{
"id": "doc-1",
"title": "Hello",
"author": "Test",
"views": 123
}
Python Script
def process_document(doc):
title = doc["title"]
return {
"title": title.upper()
}
Python Returns
{
"title": "HELLO"
}
Output Document
{
"id": "doc-1",
"title": "HELLO"
}
Config Parameters
{
name: "ExternalPython-Example"
class: "com.kmwllc.lucille.stage.ExternalPython"
scriptPath: "/path/to/my_script.py"
# Optional
pythonExecutable: "python3"
requirementsPath: "/path/to/requirements.txt"
functionName: "process_document"
port: 25333
}
Example (NumPy)
Input Document
{
"id": "doc-2",
"values": [1, 2, 3, 4, 5]
}
Python Script
import numpy as np
def process_document(doc):
arr = np.array(doc["values"], dtype=float)
return {
"values": doc["values"],
"mean": float(np.mean(arr)),
"stddev": float(np.std(arr))
}
Output Document
{
"id": "doc-2",
"values": [1, 2, 3, 4, 5],
"mean": 3.0,
"stddev": 1.41
}
requirements.txt
numpy
Config Parameters
{
name: "ExternalPython-Numpy"
class: "com.kmwllc.lucille.stage.ExternalPython"
scriptPath: "/path/to/my_numpy_script.py"
requirementsPath: "/path/to/requirements.txt"
# Optional
pythonExecutable: "python3"
functionName: "process_document"
port: 25333
}
8.3 - PromptOllama
What if you could just, actually, put an LLM on everything?
Ollama
Ollama allows you to run a variety of Large Language Models (LLMs) with minimal setup. You can also create custom models using Modelfiles and system prompts.
The PromptOllama Stage allows you to connect to a running instance of Ollama Server, which communicates with an LLM through a simple API.
The Stage sends part (or all) of a Document to the LLM for generic enrichment. You’ll want to create a custom model (with a Modelfile)
or provide a System Prompt in the Stage Config that is tailored to your pipeline.
We strongly recommend you have the LLM output only a JSON object for two main reasons: Firstly, LLMs tend to follow instructions better when instructed to do so. Secondly, Lucille can then parse the JSON response and fully integrate it into your Document.
Example
Let’s say you are working with Documents which represent emails, and you want to monitor them for potential signs of fraud. Lucille doesn’t
have a DetectFraud Stage (at time of writing), but you can use PromptOllama to add this information with an LLM.
- Modelfile: Let’s say you created a custom model,
fraud_detector, in your instance of Ollama Server. As part of the modelfile, you instruct the model to check the contents for fraud and output a JSON object containing just a boolean value (underfraud). Your Stage would be configured like so:
{
name: "Ollama-Fraud"
class: "com.kmwllc.lucille.stage.PromptOllama"
hostURL: "http://localhost:9200"
modelName: "fraud_detector"
fields: ["email_text"]
}
- System Prompt: You can also just reference a specific LLM directly, and provide a system prompt in the Stage configuration.
{
name: "Ollama-Fraud"
class: "com.kmwllc.lucille.stage.PromptOllama"
hostURL: "http://localhost:9200"
modelName: "gemma3"
systemPrompt: "You are to read the text inside \"email_text\" and output a JSON object containing only one field, fraud, a boolean, representing whether the text contains evidence of fraud or not."
fields: "email_text"
}
Regardless of the approach you choose, the LLM will receive a request that looks like this:
{
"email_text": "Let's be sure to juice the numbers in our next quarterly earnings report."
}
(Since fields: ["email_text"], any other fields on this Document are not part of the request.)
And the response from the LLM should look like this:
{
"fraud": true
}
Lucille will then add all key-value pairs in this response JSON into your Document. So, the Document will become:
{
"id": "emails.csv-85",
"run-id": "f9538992-5900-459a-90ce-2e8e1a85695c",
"email_text": "Let's be sure to juice the numbers in our next quarterly earnings report.",
"fraud": true
}
As you can see, PromptOllama is very versatile, and can be used to enrich your Documents in a lot of ways.
8.4 - QueryOpensearch
OpenSearch Templates
You can use templates in OpenSearch to repeatedly run a certain query using different parameters. For example, if we have an index full of parks, and we want to search for a certain park, we might use a template like this:
{
"source": {
"query": {
"match_phrase": {
"park_name": "{{park_to_search}}"
}
}
}
}
In Opensearch, you could then call this template (providing it park_to_search) instead of writing out the full query each time you want to search.
Templates can also have default values. For example, if you want park_to_search to default to “Central Park” when a value is not provided,
it would be written as: "park_name": "{{park_to_search}}{{^park_to_search}}Central Park{{/park_to_search}}"
QueryOpensearch Stage
The QueryOpensearch Stage executes a search template using certain fields from a Document as your parameters and adding OpenSearch’s response to the Document.
You’ll specify either templateName, the name of a search template you’ve saved, or searchTemplate, the template you want to execute, in your Config.
You’ll also need to specify the names of parameters in your search template. These will need to match the names of fields on your Documents.
If your names don’t match, you can use the RenameFields Stage first.
In particular, you have to specify which parameters are required and which are optional. If a required name in requiredParamNames is
missing from a Document, an Exception will be thrown, and the template will not be executed. If an optional name in optionalParamNames
is missing they (naturally) won’t be part of the template execution, so the default value will be used by OpenSearch.
If a parameter without a default value is missing, OpenSearch doesn’t throw an Exception - it just returns an empty response with zero hits.
So, it is very important that requiredParamNames and optionalParamNames are defined very carefully!
9 - File Handlers
File Handlers accept an InputStream for processing, and return the Documents they extract in an Iterator.
The provided InputStream and any other underlying resources are closed when the Iterator returns false for hasNext().
As such, when working directly with these File Handlers, it is important to exhaust the Iterators they return.
File Handlers (Core)
- CSV File Handler: Extracts documents from a
csvfile. - JSON File Handler: Extracts documents from a
json(or ajsonl) file. - XML File Handler: Extracts documents from an
xmlfile.
Custom File Handlers
Developers can implement and use custom File Handlers as needed. Extend BaseFileHandler to get started. To use a custom
FileHandler, you have to reference its class in its Config. This is not needed when using the File Handlers provided by Lucille.
You can override the File Handlers provided by Lucille, as well - just include the class you want to use in the Config.
10 - Connectors
Lucille Connectors
Lucille Connectors are components that retrieve data from a source system, packages the data into “Documents”, and publishes them to a pipeline.
To configure a Connector, you have to provide its class (under class) in its config. You also need to specify a name for the Connector.
Optionally, you can specify the pipeline, a docIdPrefix, and whether the Connector requires a Publisher to collapse.
You’ll also provide the parameters needed by the Connector as well. For example, the SequenceConnector requires one parameter, numDocs,
and accepts an optional parameter, startWith. So, a SequenceConnector Config would look something like this:
{
name: "Sequence-Connector-1"
class: "com.kmwllc.lucille.connector.SequenceConnector"
docIdPrefix: "sequence-connector-1-"
pipeline: "pipeline1"
numDocs: 500
startWith: 50
}
The lucille-core module contains a number of commonly used connectors. Additional connectors with a large number of dependencies are provided as optional plugin modules.
Lucille Connectors (Core)
- Abstract Connector - Base implementation for a
Connector. - Database Connector
- File Connector
- Sequence Connector - Generates a certain number of empty Documents.
- Solr Connector
- RSS Connector
The following connectors are deprecated. Use FileConnector instead, along with a corresponding FileHandler.
Lucille Connectors (Plugins)
10.1 - RSS Connector
The RSSConnector
The RSSConnector allows you to publish Documents representing the items found in an RSS feed of your choice. Each Document will
(optionally) contain fields from the RSS items, like the author, description, title, etc. By default, the Document IDs will be the
item’s guid, which should be a unique identifier for the RSS item.
You can configure the RSSConnector to only publish recent RSS items, based on the pubDate found on the items.
Also, it can run incrementally, refreshing the RSS feed after a certain amount of time until you manually stop it. The RSSConnector
will avoid publishing Documents for the same RSS item more than once.
The Documents published may have any of the following fields, depending on how the RSS feed is structured:
author(String)categories(List<String>)comments(List<String>)content(String)description(String)enclosures(List<JsonNode>). Each JsonNode contains:type(String)url(String)- May contain
length(Long)
guid(String)isPermaLink(Boolean)link(String)title(String)pubDate(Instant)
10.2 - File Connector
The file connector traverses a file system and publishes Lucille documents representing its findings. In your Configuration, specify
pathsToStorage, representing the path(s) you want to traverse. Each path can be a path to the local file system or a URI for a supported
cloud provider.
Working with Cloud Storage
When you are providing FileConnector with URIs to cloud storage, you also need to apply the appropriate configuration for any
cloud providers used. For each provider, you’ll need to provide a form of authentication; you can optionally
specify the maximum number of files (maxNumOfPages) that Lucille will load into memory for a given request.
- Azure: Specify the needed options in
azurein your Config. You must provideconnectionString, or you must provideaccountNameandaccountKey. - Google: Specify the needed options in
gcpin your Config. You must providepathToServiceKey. - S3: Specify the needed options in
s3in your Config. You must provideaccessKeyId,secretAccessKey, andregion. For URIs,pathsToStoragemust be percent-encoded for special characters (e.g.,s3://test/folder%20with%20spaces). - For each of these providers, in their configuration, you can optionally include
maxNumOfPagesas well.
Applying FileHandlers
Some of the files that your FileConnector encounters will, themselves, contain data that you want to extract more documents from! For example, the FileConnector
may encounter a .csv file, where each row itself represents a Document to be published. This is where FileHandlers come in - they will individually process these files
and create more Lucille documents from their data. See File Handlers for more.
In order to use File Handlers, you need to specify the appropriate configuration within your Config - specifically, each File Handler
you want to use will be a map within the fileOptions you specify. You can use csv, json, or xml.
See the documentation for each File Handler to see what arguments are needed / accepted.
File Options
File options determine how you handle and process files you encounter during a traversal. Some commonly used options include:
getFileContent: Whether, during traversal, the FileConnector should add an array of bytes representing the file’s contents to the Lucille document it publishes. This will slow down traversal significantly and is resource intensive. On the cloud, this will download the file contents.handleArchivedFiles/handleCompressedFiles: Whether you want to handle archive or compressed files, respectively, during your traversal. For cloud files, this will download the file’s contents.moveToAfterProcessing: A path to move files to after processing.moveToErrorFolder: A path to move files to if an error occurs.
Filter Options
Filter options determine which files will/won’t be processed & published in your traversal. All filter options are optional. If you specify multiple filter options, files must comply with all of them to be processed & published.
includes: A list of patterns for the only file names that you want to include in your traversal.excludes: A list of patterns for file names that you want to exclude from your traversal.lastModifiedCutoff: Filter out files that haven’t been modified recently. For example, specify"1h", and only files modified within the last hour will be processed & published.lastPublishedCutoff: Filter out files that were recently published by Lucille. For example, specify"1h", and only files published by Lucille more than an hour ago (or never published) will be processed & published. Requires you to provide state configuration, otherwise, it will not be enforced!
State
The File Connector can keep track of when files were last known to be published by Lucille. This allows you to use FilterOptions.lastPublishedCutoff and
avoid repeatedly publishing the same files in a short period of time.
In order to use state with the File Connector, you’ll need to configure a connection to a JDBC-compatible database. The database can be embedded, or it can be remote.
It’s important to note that File Connector state is designed to be efficient and lightweight. As such, keep a few points in mind:
- Files that were recently moved / renamed files will not have the
lastPublishedCutoffapplied. - In your File Connector configuration, it is important that you consistently capitalize directory names in your
pathToStorage, if you are using state. - Each database table should be used for only one connector configuration.
11 - Pipeline
A Pipeline refers to the complete sequence of stages that can be configured to transform documents with Lucille.
In most the most common architecture, a Pipeline is fed by a Connector and emits documents to an Indexer.