Bulk Loading data into JanusGraph — Part 2

Nitin Poddar
4 min readJul 10, 2020

In the last post, we configured and started JanusGraph server with ConfigurationGraphFactory and defined our schema and created index.

In this post, we will first go over some of the configuration option we chose when creating our custom configuration graph. Later, we will also look at the Apache Spark code to insert data into JanusGraph in bulk fashion.

Configuration Options

Let’s look at some of the configuration options we chose for our graph.

* Experiment with different config and value options to reach best performance

There are lots of different config options available. I would highly encourage to read and experiment as per your need. For a full list of configuration options refer here.

Connecting to JanusGraph

Even though JanusGraph provides support for any language for which Tinkerpop driver exists like Java, Python, or C#. I tried working with python and Java and found out that is much easier working with Java as it is possible to embed JanusGraph as a library inside a Java application. But for python there is no JanusGraph library available and you will have to work with gremlin-python distribution. I will cover go over connectivity with both Java and Python.

Connecting from Java

  1. Add following dependencies to pom.xml
<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>gremlin-driver</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-core</artifactId>
<version>0.5.2</version>
</dependency>
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-driver</artifactId>
<version>0.5.2</version>
</dependency>
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-cql</artifactId>
<version>0.5.2</version>
</dependency>
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-es</artifactId>
<version>0.5.2</version>
</dependency>

Even though JanusGraph documentation does not mention adding core, cql and es dependencies. You will have to add them because we are using Scylla as storage backend with indexing on vertex and edge.

2. Creating the connection using JanusGraphFactory builder

JanusGraph graph = JanusGraphFactory.build()
.set("storage.backend", "cql")
.set("storage.hostname", hostname)
.set("storage.cql.keyspace", keyspace)
.set("storage.batch-loading", true)
.open();

3. Create Apache Spark based bulk load code similar to this

df.foreachPartition(new ForeachPartitionFunction<Row>() {
public void call(Iterator<Row> iterator) {
try {
JanusGraph graph = JanusGraphFactory.build()
.set("storage.backend", "cql")
.set("storage.hostname", hostname)
.set("storage.cql.keyspace", keyspace)
.set("storage.batch-loading", true)
.open();

GraphTraversalSource g = graph.traversal();
if (!graph.tx().isOpen()) {
graph.tx().open();
}
while (iterator.hasNext()) {

...
...
// Create Vertex From
if (!g.V().hasLabel(fromLabelName)
.has("idValue", fromIdentifierId)
.hasNext()) {

vertexFrom = graph.addVertex(fromLabelName);
vertexFrom.property("idValue", fromIdentifierId);
vertexFrom.property("idType", fromLabelName);
vertexFrom.property("timestamp", timestamp);
} else {
vertexFrom = g.V().hasLabel(fromLabelName)
.has("idValue", fromIdentifierId)
.next();
}

// Create Vertex To
if (!g.V().hasLabel(toLabelName)
.has("idValue", toIdentifierId)
.hasNext()) {

vertexTo = graph.addVertex(toLabelName);
vertexTo.property("idValue", toIdentifierId);
vertexTo.property("idType", toLabelName);
vertexTo.property("timestamp", timestamp);
} else {
vertexTo = g.V().hasLabel(toLabelName)
.has("idValue", toIdentifierId)
.next();
}

// Create Edge between the From and To
if (!g.E().hasLabel("REL")
.has("edgeValue", fromIdentifierId+toIdentifierId)
.hasNext()){

Edge e = vertexFrom.addEdge("REL", vertexTo);
e.property("timestamp", System.currentTimeMillis());
e.property("edgeValue",fromIdentifierId+toIdentifierId);
}

graph.tx().commit();
graph.tx().close();
graph.close();
g.close();
} catch (Exception e){
System.out.println(e.getLocalizedMessage());
}
}
});

The sample code above uses the spark foreachpartition and creates a graph instance and opens up a transaction for each partition. After inserting all the vertices and edges for each row within the partition it commits the transaction and closes the resources.

Connecting from Python

  1. There is no JanusGraph library for python language, we need to use gremlin-python package to connect. Install gremlin-python using command below.
pip install gremlinpython==3.4.6

2. Create a sample code using gremlin python and connect using remote driver

def insertVertexAndEdge(records):
graph = Graph()
g1 = graph \
.traversal() \
.withRemote(DriverRemoteConnection('ws://<janusgraph-server>:8182/gremlin','ig'))
//ig is the traversal name from empty-groovy.scripts
for row in records:
g1.addV('ZYNC_COOKIE') \
.property('idValue',row["session_id"]) \
.next()

...

df.foreachPartition(insertVertexAndEdge)

There is no support for opening a transaction to execute multiple queries in a single transaction in the gremlin-python library. Each record is executed in a single transaction and .next() commits the transaction. This is not going to be very efficient if you have to insert millions of records.

Performance

Going with the Java based Apache Spark code, I was able to get a write performance of approx. 30,000 qps and read performance of approx. 45,000 qps. Going with a relatively small infrastructure we could see the performance meeting our SLAs.

References

I started working with JanusGraph recently. Prior to this, I had experience working with Neo4j and Amazon Neptune. But with JanusGraph being open-source, there is not much help available. Plus most of the articles/help available is with older version and is obsolete now. But some of these articles and links were very helpful.

Happy Learning

PS: This is my very first article on medium.com. I thoroughly enjoyed writing and sharing this knowledge. Hopefully you will find it useful too :-)

--

--