2017-04-10 107 views
0

我有两个服务器,一个是neo4j存储图形数据,另一个服务器运行ETL,每分钟将数据加载到neo4j。我现在的解决方案是:使用for循环为每个数据项目(基于py2neo)运行事务,但性能非常慢,我也尝试在neo4j本地保存一个tmp csv文件服务器,然后在cypher中使用load csv语法,它会提高性能很多,但我不知道如何从远程服务器加载csv。如何使用python将数据从远程服务器加载到neo4j中?

所以,我想知道的是,如果有一种方法可以将dict/list /(pandas dataframe)加载到neo4j中?就像加载CSV进行批量导入,在Python脚本? 我是neo4j的新手,非常感谢您的帮助。

回答

0

如果要从远程服务器加载CSV,则需要运行一个简单的HTTPServer或类似的HTTPServer上托管文件。然后,你可以简单地使用

LOAD CSV FROM "http://192.x.x.x/myfile.csv" as row

在另一方面,你可以从熊猫数据帧导入文件。我创建一个计算线性回归梯度一个简单的脚本,并将其保存回Neo4j的

from neo4j.v1 import GraphDatabase 
import pandas as pd 
import numpy as np 
driver = GraphDatabase.driver("bolt://192.168.x.x:7687", auth=("neo4j", "neo4j")) 
session = driver.session() 

def weekly_count_gradient(data): 
    df = pd.DataFrame([r.values() for r in data], columns=data.keys()) 
    df["week"] = df.start.apply(lambda x: pd.to_datetime(x).week if pd.notnull(x) else None) 
    df["year"] = df.start.apply(lambda x: pd.to_datetime(x).year if pd.notnull(x) else None) 
    group = df.groupby(["week","year","company"]).start.count().reset_index() 
    for name in group["company"].unique(): 
     if group[group["company"] == name].shape[0] >= 5: 
      x = np.array([i[1] if i[0] == 2016 else i[1] + 52 for i in group[group.company == name][["year","week"]].values]) 
      y = group[group.company == name]["start"].values 
      fit = np.polyfit(x,y,deg=1)  
      update = session.run("MATCH (a:Company{code:{code}}) SET a.weekly_count_gradient = toFLOAT({gradient}) RETURN a.code,{"code":name,"gradient":fit[0]}) 

这里的关键是,你运行一个带参数的查询,参数可以来自任何地方(列表/字典/熊猫)

相关问题