2017-09-13 110 views
1

我试图使用node.js将csv文件导入neo4j。我必须将数据插入多个collection/table,所以我必须使用node.js脚本插入数据。但我的问题是,插入CSV数据时无法防止数据重复。使用node.js在neo4j中导入CSV

样品CSV数据:

name 
------------- 
Afghanistan 
Afghanistan 
Aland 
Albania 
Albania 
Bangladesh 
Bangladesh 

index.js

cp = require('child_process'); 
child = cp.fork(__dirname + "/background-import-csv-file.js"); 
child.on('message', function(msg) { 
    console.log("background-insert-process said : ", msg); 
}); 
file = path.resolve(__dirname, `./file/simplemaps.csv`); 
child.send(file); 

background-import-csv-file.js,我有两种不同的方式来编写代码。

首先无极基于background-import-csv-file.js):

cp = require('child_process'); 
csv = require('fast-csv'); 
Q = require('q'); 
DB = require("./common/driver"); 
Country = require('./collection/country'); 
process.on("message", (file) => { 
    stream = fs.createReadStream(file); 
    csv 
    .fromStream(stream, { headers: true }) 
    .on("data", function(data) { 
     let countryData = { "name": data.name }; 
     neo = new DB(); 
     country = new Country(neo); 
     country.insert(countryData) 
      .then(resp => process.send(resp.msg)) 
      .catch(err => process.send(err)) 
    }) 
    .on("end",() => process.send("file read complete")); 
}); 

./collection/country.js

Q = require('q'); 
    Country = function Country(neo) { 
    this.country = "Country"; this.neo = neo; 
    }; 

    Country.prototype.find = function find(filters) { 
    query = `MATCH (a:Country { name: '${filters.name}' }) RETURN {country:properties(a)}`; 
    return this.neo.run(query, filters).then(resp => resp); 
    } 

    Country.prototype.create = function create(data) { 
    query = `CREATE (ax:Country { name: '${data.name}' }) RETURN ax `; 
    return this.neo.run(query, {}).then(resp => resp[0].properties).catch(err => err) 
    } 

    Country.prototype.insert = function insert(country) { 
     filter = { name: country.name }; 
     return Q(this.find(filter)) 
     .then(resp => resp.length > 0 ? Q.resolve({ msg: `country: [${country.name}] is already exist` }) : Q.resolve(this.create(country)) ) 
    .then(resp => resp) 
    .catch(e => Q.reject(e)); 
    } 

    module.exports = Country; 

./common/driver.js

neo4j = require('neo4j-driver').v1; 
function DB() { 
    this.driver = neo4j.driver(); this.session = this.driver.session(); 
} 

DB.prototype.run = function run(query, data) { 
    return this.session.run(query, data) 
    .then(response => response.records.map(
      record => record._fields[0] ? 
      record._fields.length ? record._fields[0] : {} : {} 
     )).catch(err => new Error(err)); 
} 

module.exports = DB; 

当我在终端运行index.js,在databas e,我有2 Afghanistan,1 Aland,2 Albania和2 Bangladesh。但我需要在我的数据库中有1 Afghanistan,1 Aland,1 Albania和1 Bangladesh。当我分析代码时,发现在插入数据之前,我正在检查数据(Country.prototype.find = function find(filters)),如果它已经存在与否,但它总是返回空结果。这就是为什么它插入多个数据。如果我再次运行index.js,则不会将新数据插入到数据库中。为了解决这个问题,我已经试过以下CQL

MERGE (c:Country { name: '${data.name}' }) RETURN c 

它插入唯一的数据,但它杀了这么多的时间。然后,我写了下面的代码:

事件驱动background-import-csv-file.js):

process.on("message", (file) => { 
    stream = fs.createReadStream(file); 
    csv 
    .fromStream(stream, { headers: true }) 
    .on("data", function(data) { 
     countryData = { "name": data.name }; 
     neo = new DB(); 
     country = new Country(neo); 
     country.find(countryData); 
     country.on('find', resp => resp.length > 0 ? Q.resolve({ msg: `country: [${country.name}] is already exist` }) : Q.resolve(country.create(countryData)) ); 

     country.on('create', resp => console.log(resp)); 
    }) 
    .on("end",() => process.send("file read complete")); 
}); 

./collection/country.js

EventEmitter = require('events').EventEmitter; 
util = require('util'); 

Country = function Country(neo) { 
    this.neo = neo; EventEmitter.call(this); 
}; 
util.inherits(Country, EventEmitter); 

Country.prototype.find = function find(filters) { 
    query = `MATCH (a:Country { name: '${filters.name}' }) RETURN {country:properties(a)}`; 
    return this.neo.run(query, {}).then(resp => this.emit('find', resp)); 
} 

Country.prototype.create = function create(data) { 
    query = `CREATE (ax:Country { name: '${data.name}' }) RETURN ax `; 
    return this.neo.run(query, {}).then(resp => this.emit('create', resp[0].properties)).catch(err => err) 
} 

而这一次,它显示了同样的结果。我错过了什么?任何建议都将非常有用。

注意:我正在使用fast-csv进行csv解析,并使用Q作为承诺。

+0

什么是 “勒贝尔” 是什么意思?我没有看到一个显而易见的理由,为什么用一个简单的Cypher查询就无法做到这一点。 – cybersam

回答

1

我的问题是,在csv文件解析,它是如此之快(事件驱动),它不等待完成插入数据到数据库。所以我必须暂停文件解析然后重新开始。

我解决使用下面的代码我的问题:

承诺基础(背景进口的CSV file.js):

cp = require('child_process'); 
csv = require('fast-csv'); 
Q = require('q'); 
DB = require("./common/driver"); 
Country = require('./collection/country'); 

process.on("message", (file) => { 
    stream = fs.createReadStream(file); 
    csvstream = csv 
    .fromStream(stream, { headers: true }) 
    .on("data", function(data) { 
     csvstream.pause(); // pause the csv file parsing 
     countryData = { "name": data.name }; 
     neo = new DB(); 
     country = new Country(neo); 
     country.insert(countryData) 
     .then(resp => { 
      process.send(resp.msg); 
      neo.close(); 
      return csvstream.resume(); // after completing db process, resume 
     }) 
     .catch(err => { 
      process.send(err); 
      return csvstream.resume(); // if failed, then resume 
      }); 
    }) 
    .on("end",() => process.send("file read complete")); 
}); 
1

其实我可以想象以下解决方案:

  1. 修改CSV文件本身的编程语言(如node.js中)具有相同名称中删除重复行。
  2. 添加的Neo4j unique constrains CREATE CONSTRAINT ON (c:Country) ASSERT c.name IS UNIQUE
  3. 涉及中间件,像排队要防止重复项,对于这一点,你需要定义你自己的消息结构和重复运算。

以上。