2015-10-15 67 views
1

我正在使用Nodejs作为后端。我试过this npm包创建一个简单的工作流程(AMAZON-SWF)。该软件包有一个示例文件夹,其中包含我放在我的节点项目中的文件,以便我了解其工作原理。为什么descisionTask没有从AWS-SWF服务(SWF)接收任何任务?

问题是Decider没有收到来自SWF服务器的任何任务。因为我的工作流程从未运行。是否有一些配置问题。请指出我所做的错误。

以下是用于快速参考的代码。代码唯一的变化就是版本号的变化和域名的变化。否则,它与代码相同,您可以找到here

以下是决定代码。

var swf = require('./index'); 

var myDecider = new swf.Decider({ 
    "domain": "test-domain", 
    "taskList": {"name": "my-workflow-tasklist"}, 
    "identity": "Decider-01", 
    "maximumPageSize": 100, 
    "reverseOrder": false // IMPORTANT: must replay events in the right order, ie. from the start 
}); 

myDecider.on('decisionTask', function (decisionTask) { 

    console.log("Got a new decision task !"); 

    if(!decisionTask.eventList.scheduled('step1')) { 
     decisionTask.response.schedule({ 
      name: 'step1', 
      activity: 'simple-activity' 
     }); 
    } 
    else { 
     decisionTask.response.stop({ 
      result: "some workflow output data" 
     }); 
    } 

    decisionTask.response.respondCompleted(decisionTask.response.decisions, function(err, result) { 

     if(err) { 
      console.log(err); 
      return; 
     } 

     console.log("responded with some data !"); 
    }); 

}); 

myDecider.on('poll', function(d) { 
    //console.log(_this.config.identity + ": polling for decision tasks..."); 
    console.log("polling for tasks...", d); 
}); 

// Start polling 
myDecider.start(); 



/** 
* It is not recommanded to stop the poller in the middle of a long-polling request, 
* because SWF might schedule an DecisionTask to this poller anyway, which will obviously timeout. 
* 
* The .stop() method will wait for the end of the current polling request, 
* eventually wait for a last decision execution, then stop properly : 
*/ 
process.on('SIGINT', function() { 
    console.log('Got SIGINT ! Stopping decider poller after this request...please wait...'); 
    myDecider.stop(); 
}); 

以下是活动码:

/** 
* This simple worker example will respond to any incoming task 
* on the 'my-workflow-tasklist, by setting the input parameters as the results of the task 
*/ 

var swf = require('./index'); 

var activityPoller = new swf.ActivityPoller({ 
    domain: 'test-domain-newspecies', 
    taskList: { name: 'my-workflow-tasklist' }, 
    identity: 'simple-activity' 
}); 

activityPoller.on('error',function() { 
    console.log('error'); 
}); 

activityPoller.on('activityTask', function(task) { 
    console.log("Received new activity task !"); 
    var output = task.input; 

    task.respondCompleted(output, function (err) { 

     if(err) { 
      console.log(err); 
      return; 
     } 

     console.log("responded with some data !"); 
    }); 
}); 


activityPoller.on('poll', function(d) { 
    console.log("polling for activity tasks...", d); 
}); 

activityPoller.on('error', function(error) { 
    console.log(error); 
}); 


// Start polling 
activityPoller.start(); 


/** 
* It is not recommanded to stop the poller in the middle of a long-polling request, 
* because SWF might schedule an ActivityTask to this poller anyway, which will obviously timeout. 
* 
* The .stop() method will wait for the end of the current polling request, 
* eventually wait for a last activity execution, then stop properly : 
*/ 
process.on('SIGINT', function() { 
    console.log('Got SIGINT ! Stopping activity poller after this request...please wait...'); 
    activityPoller.stop(); 
}); 

以下是哪些寄存器的代码:

var awsswf = require('./index'); 
var swf = awsswf.createClient(); 
/** 
* Register the domain "test-domain" 
*/ 
swf.registerDomain({ 
    name: "test-domain-newspecies", 
    description: "this is a just a test domain", 
    workflowExecutionRetentionPeriodInDays: "3" 
}, function (err, results) { 

    if (err && err.code != 'DomainAlreadyExistsFault') { 
     console.log("Unable to register domain: ", err); 
     return; 
    } 
    console.log("'test-domain-newspecies' registered !") 


    /** 
    * Register the WorkflowType "simple-workflow" 
    */ 
    swf.registerWorkflowType({ 
     domain: "test-domain-newspecies", 
     name: "simple-workflow", 
     version: "2.0" 
    }, function (err, results) { 

     if (err && err.code != 'TypeAlreadyExistsFault') { 
      console.log("Unable to register workflow: ", err); 
      return; 
     } 
     console.log("'simple-workflow' registered !") 

     /** 
     * Register the ActivityType "simple-activity" 
     */ 
     swf.registerActivityType({ 
      domain: "test-domain-newspecies", 
      name: "simple-activity", 
      version: "2.0" 
     }, function (err, results) { 

      if (err && err.code != 'TypeAlreadyExistsFault') { 
       console.log("Unable to register activity type: ", err); 
       return; 
      } 

      console.log("'simple-activity' registered !"); 
     }); 

    }); 

}); 

以下是其中启动工作流执行的代码:

var swf = require('./index'); 

var workflow = new swf.Workflow({ 
    "domain": "test-domain-newspecies", 
    "workflowType": { 
     "name": "simple-workflow", 
     "version": "2.0" 
    }, 
    "taskList": { "name": "my-workflow-tasklist" }, 
    "executionStartToCloseTimeout": "1800", 
    "taskStartToCloseTimeout": "1800", 
    "tagList": ["example"], 
    "childPolicy": "TERMINATE" 
}); 


var workflowExecution = workflow.start({ input: "any data ..."}, function (err, runId) { 

    if (err) { console.log("Cannot start workflow : ", err); return; } 

    console.log("Workflow started, runId: " +runId); 

}); 

以下是index.js文件

var basePath = "../node_modules/aws-swf/lib/"; 
exports.AWS = require('aws-swf').AWS; 
exports.AWS.config.loadFromPath(__dirname + '/../config/awsConfig.json'); 
exports.createClient = require(basePath+"swf").createClient; 
exports.Workflow = require(basePath+"workflow").Workflow; 
exports.WorkflowExecution = require(basePath+"workflow-execution").WorkflowExecution; 
exports.ActivityPoller = require(basePath+"activity-poller").ActivityPoller; 
exports.ActivityTask = require(basePath+"activity-task").ActivityTask; 
exports.Decider = require(basePath+"decider").Decider; 
exports.DecisionTask = require(basePath+"decision-task").DecisionTask; 
exports.EventList = require(basePath+"event-list").EventList; 
exports.DecisionResponse = require(basePath+"decision-response").DecisionResponse; 
exports.Poller = require(basePath+"poller").Poller; 

运行此代码的方式是同时打开三个终端。然后我在各自的终端执行以下命令。

activity 
node <activity-file-name> 

decider 
node <decider-file-name> 

start and register I run in the same terminal. 
node <register-file-name> 
node <start-file-name> 

回答

2

它代表的是在决胜局您使用"test-domain",但在代码的其余部分使用的是"test-domain-newspecies"

如果域"test-domain"未注册,则在轮询决策任务时应得到UnknownResourceFault错误。

+1

顺便说一句,这个问题在SWF中很常见 - 你经常有不同的进程,必须保持配置在所有的配置中同步。 – Kobi

+0

这解决了我的决策任务的问题 - 非常感谢。我仍然不知道为什么决策任务不能开始简单活动。当我登录AWS控制台时,我得到这个“ScheduleActivityTaskFailed”,因为它正在调用其版本为1.0的活动(现在已弃用)。它应该在2.0版本中被称为活动。我在哪里提到它应该使用2.0。 **我不明白的是,SWF如何知道要调用哪个版本,因为活动轮询器和决策者轮询器都不会让您提及该版本。** – nsp

+0

想通了。代码中的一些错误。采取静态版本1.0 http://neyric.github.io/aws-swf/apidoc/decision-response.js.html行号。 169 – nsp

相关问题