0

我必须使用Google Dataflow Process中的Google Cloud函数逐个触发多个模板。执行完一个模板后,必须调用另一个模板。以同步的方式通过云功能触发多数据流作业?

const google = require('googleapis'); 
exports.goWithTheDataFlow = function(event, callback) { 
const file = event.data; 
if (file.resourceState === 'exists' && file.name) { 
    google.auth.getApplicationDefault(function (err, authClient, projectId) { 
    if (err) { 
     throw err; 
    } 
    if (authClient.createScopedRequired && authClient.createScopedRequired()) { 
     authClient = authClient.createScoped([ 
     'https://www.googleapis.com/auth/cloud-platform', 
     'https://www.googleapis.com/auth/userinfo.email' 
     ]); 
    } 

    const dataflow = google.dataflow({ version: 'v1b3', auth: authClient }); 
    dataflow.projects.templates.create({ 
     projectId: 'testing1-180111', 
     resource: { 
     parameters: { 
     }, 
     jobName: 'cloud-fn-dataflow-test', 
     gcsPath: 'gs://kishan-configuration/templates/FinalConfigTable' 
     } 
    }, function(err, response) { 
     if (err) { 
     console.error("problem running dataflow template, error was: ", err); 
     } 
     console.log("Dataflow template response: ", response); 
     callback(); 
    }); 

    }); 
} 
}; 

package.json文件的代码是这样

{ 
    "name": "kishan_kumar464", 
    "version": "1.0.0", 
    "main": "index.js", 
    "dependencies": { 
    "google-cloud": "^0.56.0", 
    "googleapis": "^22.2.0" 
    }, 
    "devDependencies": {}, 
    "scripts": { 
    "test": "echo \"Error: no test specified\" && exit 1" 
    }, 
    "author": "Kishan", 
    "license": "ISC", 
    "description": "" 
} 
+0

我认为你正在寻找工作流管理系统。 看看luigi,Airflow等等的目的 – Anuj

+0

不,我没有可以U请分享LINK,我也在尝试使用Pub/Sub我会在队列中放置一条消息来触发下一个管道 - 使用云端pubsub。 – BackBenChers

+0

Pub/Sub也失败了,因为我们执行PubSub来启动一个新函数在结束我之前调用函数Pipeline.Doesn't重要,当我们呼吁它。我必须使用模板运行我的工作,所以图必须以这种方式定义。 – BackBenChers

回答

0

如果我理解正确的话,你想从模板创建工作,等到完成。之后,您可以从模板创建下一个作业。正确?

我想你也许可以轮询作业状态创建作业后(作业ID为dataflow.projects.templates.create的响应。

要检查作业的状态,请使用项目。 jobs.get API。

+0

我会在队列中放置一条消息来触发下一个管道 - 云端pubsub对此非常有用,它可以触发云端功能。但我会尝试你的方式可以提供给我一些例子 – BackBenChers

+0

我同意你通过pubsub发送“job X finished”通知会更好。但是现在数据流不支持。 –

+0

我没有一个直接的例子,但既然你已经可以调用dataflow API,你可以做的是:(1)使用dataflow.projects.jobs.get API来获得一个Job对象。 (2)检查job.currentState字段。如果状态为JOB_STATE_DONE,则继续下一项工作。 (3)如果作业仍在运行或失败,则相应处理。 –