Neo4j

Neo4j Connector #

Register Neo4j Connector #

curl -XPUT "http://localhost:9000/connector/neo4j?replace=true" -d '
{
  "name" : "Neo4j Connector",
  "description" : "Fetch data from Neo4j graph database using Cypher.",
  "category" : "database",
  "icon" : "/assets/icons/connector/neo4j/icon.png",
  "tags" : [
    "graph",
    "database",
    "neo4j"
  ],
  "url" : "http://coco.rs/connectors/neo4j",
  "assets" : {
    "icons" : {
      "default" : "/assets/icons/connector/neo4j/icon.png"
    }
  },
  "processor": {
    "enabled": true,
    "name": "neo4j"
  }
}'

Use neo4j as the unique identifier because it is a built-in connector.

Use the Neo4j Connector #

The Neo4j connector runs a Cypher query, transforms the resulting rows into documents, and streams them into coco-server.

Configure Neo4j Datasource #

Connection URI: Bolt/Neo4j URI. Supported schemes include neo4j://, neo4j+s://, bolt://, etc.

Database: Optional database name when running in multi-database installations.

Cypher Query: The statement that returns the records you want to index. The connector treats each row as a document candidate.

Query Parameters: Optional key/value pairs passed to the Cypher query.

Enable Pagination: Enables driver-level pagination (recommended for large result sets).

Page Size: Number of records fetched per page when pagination is enabled. Defaults to 500.

Field Mapping: Optional mapping that aligns result fields with document properties (id, title, content, metadata, etc.).

Incremental Sync Fields #

Neo4j incremental sync relies on two expressions:

  • Watermark Property: a Cypher expression returned by your query that monotonically increases (e.g., datetime.coalesce(n.updated_at, n.created_at)).
  • Tie-breaker Property: a secondary expression that uniquely orders records sharing the same watermark value (e.g., elementId(n)).

Example snippet inside the datasource config:

incremental:
  enabled: true
  mode: property_watermark
  property: n.updated_at
  property_type: datetime
  tie_breaker: elementId(n)
  resume_from: 2025-01-01T00:00:00Z

During the first run you can optionally set resume_from to seed the starting watermark. Afterwards coco-server persists the last (property, tie_breaker) tuple and resumes from there.

Example Request #

Case 1 #

When a query returns a full node object without explicitly projecting its properties (e.g., using RETURN n), the Neo4j connector will automatically expand/serialize those properties. To correctly map the resulting fields, you must reference them using the node alias prefix (e.g., n.propertyName).

  • In this case, use the datetime type field updated as the increment property.
  • elementId as the tie-breaker.
curl -H 'Content-Type: application/json' -XPOST "http://localhost:9000/datasource/" -d '
{
  "name": "Neo4j Docs",
  "type": "connector",
  "connector": {
    "id": "neo4j",
    "config": {
      "connection_uri": "neo4j+s://graph.example.com:7687",
      "username": "coco_sync",
      "password": "changeme",
      "database": "neo4j",
      "cypher": "MATCH (n:Document) RETURN n",
      "parameters": {
        "limit": 1000
      },
      "pagination": true,
      "page_size": 500,
      "incremental": {
        "enabled": true,
        "mode": "property_watermark",
        "property": "n.updated",
        "property_type": "datetime",
        "tie_breaker": "elementId(n)"
      },
      "field_mapping": {
        "enabled": true,
        "mapping": {
          "id": "id",
          "title": "title",
          "content": "content",
          "updated": "updated"
        }
      }
    }
  }
}'

The final Cypher query string will look like this:

MATCH (n:Document)
WITH 
    n,
    n.updated AS coco_property, // The primary incremental field (timestamp)
    elementId(n) AS coco_tie    // The secondary incremental field (unique string ID)

// Filtering Logic: Retrieve all documents newer than the cursor time OR 
// documents at the exact cursor time that have a greater elementId.
WHERE coco_property > datetime("2025-10-09T12:59:06.836Z") 
   OR (
       coco_property = datetime("2025-10-09T12:59:06.836Z") 
       AND coco_tie > "4:1093e8af-4022-4813-9534-8c7121ac10d4:33"
   )

// Sorting: Order by time first, then by the unique ID to break ties. 
// This dual-sort structure is essential for robust cursor pagination.
ORDER BY 
    coco_property ASC, 
    coco_tie ASC 

// Limit the results to the next batch.
LIMIT 500 

// Return the full node object and the projected fields.
RETURN n, coco_property, coco_tie

Case 2 #

The global_version field is designated as the global incremental property for the :Document node. This property must be monotonically incremented by the system across all data manipulation operations—specifically CREATE, SET/UPDATE, and DELETE—to ensure a globally traceable change history for the entity.

  • In this case, use the global_version as the increment property.
  • elementId as the tie-breaker.
curl -H 'Content-Type: application/json' -XPOST "http://localhost:9000/datasource/" -d '
{
  "name": "Neo4j Docs",
  "type": "connector",
  "connector": {
    "id": "neo4j",
    "config": {
      "connection_uri": "neo4j+s://graph.example.com:7687",
      "username": "coco_sync",
      "password": "changeme",
      "database": "neo4j",
      "cypher": "MATCH (n:Document) RETURN n.uuid AS id, n.title AS title, n.content AS content, n.updated AS updated, elementId(n) AS eid, n.global_version as version",
      "parameters": {
        "limit": 1000
      },
      "pagination": true,
      "page_size": 500,
      "incremental": {
        "enabled": true,
        "mode": "property_watermark",
        "property": "version",
        "property_type": "int",
        "tie_breaker": "eid",
        "resume_from": "1"
      },
      "field_mapping": {
        "enabled": true,
        "mapping": {
          "id": "id",
          "title": "title",
          "content": "content",
          "updated": "updated"
        }
      }
    }
  }
}'

Supported Config Parameters for Neo4j Connector #

FieldTypeDescription
connection_uristringNeo4j Bolt URI, including scheme and port (e.g., neo4j+s://graph.example.com:7687).
username / passwordstringCredentials for basic authentication. Use auth_token instead for bearer auth.
auth_tokenstringOptional bearer token. Overrides username/password when provided.
databasestringOptional database name. Defaults to the Neo4j default DB.
cypherstringCypher statement executed for each sync.
parametersobjectOptional key/value parameters supplied to the Cypher query.
paginationbooleanEnables driver pagination. Recommended for large result sets.
page_sizeintegerNumber of records fetched per page when pagination is enabled. Defaults to 500.
incrementalobjectOptional incremental configuration (see table below).
field_mappingobjectOptional mapping from result fields to document schema.

Incremental Config Fields #

FieldTypeDescription
enabledbooleanTurn incremental sync on/off.
modestringOnly property_watermark is currently supported.
propertystringCypher expression returned by your query that acts as the watermark (e.g., n.updated_at).
property_typestringHint for how to interpret the watermark (datetime, int, float, or string).
tie_breakerstringCypher expression used to order rows sharing the same watermark (e.g., elementId(n)).
resume_fromstringOptional starting watermark for the first run (RFC3339 timestamp for datetime properties).

With these settings, the Neo4j connector reliably pages through large result sets, resuming from the precise (watermark, tie-breaker) position on each sync.

Edit Edit this page