2017-10-07 111 views
0

我想调用的RxJS方式AWS lambda函数:如何调用AWS LAMBDA与Observable.bindNodeCallback(lambda.invoke)

invokeLambda(): Observable<string> { 
    const lambda = new AWS.Lambda({region: environment.region, apiVersion: "2015-03-31"}); 
    const invoke$ = (functionName, payload, invocationType = "RequestResponse") => { 
    return Observable.bindNodeCallback(lambda.invoke)(); 
}; 

return invoke$(environment.functionName, {}).map((result: InvocationResponse) => JSON.parse(<string>result.Payload)); 

}

但是当我尝试:

this.myService.invokeLambda().subscribe(() => { dosomething(); } 

我得到的错误:

page.html:5 ERROR TypeError: this.makeRequest is not a function 
at svc.(anonymous function) 

我做错了什么?

+2

似乎上下文不绑定,你试过'bindNodeCallback(lambda.invoke.bind(lambda))...'? –

+0

像一个迷人的工作。谢谢 –

回答

0

我建议创建一个门面以将http请求分发到作为AWS Lambda代理的Amazon API网关。您可以将AWS SDK事件处理程序包装为可观察值。在以下示例中,我使用rxjs搜索身份池名称,并确定名称是否存在于我的AWS账户内的Amazon Cognito联合身份身份池列表中。我将Lambda函数与AWS Step Functions集成以增强rxjs“下一个”通道功能。 Rxjs,AWS Lambda和AWS Step Functions重试任务状态定义允许我通过分配cognitoidentity.listIdentityPools名称来“继续”Amazon Cognito联合身份标识池名称存在搜索。 Step Functions在使用rxjs时可以递归执行lambda函数。通过将第一次执行lambda函数返回的分页标记作为参数返回到下一个通过step函数状态递归执行lambda函数。即

exports.handler = (event, context, callback) => { 

    AWS.config.region = "..."; 

    AWS.config.apiVersions = { 
    cognitoidentity: '2014-06-30' 
    }; 

    var cognitoidentity = new AWS.CognitoIdentity(); 

    var params = { 
    MaxResults: 1, 
    get NextToken() { 
     if (event.NextToken == null || undefined) { 
     return null; 
     } else { 
     return event.NextToken; 
     } 
    } 
    }; 

    var eventResult = { 
    identityPoolName: '', 
    identityPoolId: '', 
    NextToken: '', 
    NextState: '', 
    error: '', 
    errorReason: '' 
    }; 

function listIdentityPoolsObservable(params) { 
    return Rx.Observable.create(observer => { 

    cognitoidentity.listIdentityPools(params) 
     .on('success', function(response) { 
     observer.next(response.data); 
     }) 
     .on('error', function(error, response) { 
     observer.error(error); 
     }) 
     .on('complete', function(response) { 
     if (response.error) { 
      observer.error(response.error) 
     } else { 
      observer.complete(response); 
     } 
     }).send(); 
    }); 
    }; 

    const source$ = listIdentityPoolsObservable(params) 
    .share() 
    .observeOn(Rx.Scheduler.asap); 

    const identityPoolsSource$ = source$.map(x => { 
    return x.IdentityPools; 
    }) 
    .flatMap(x => { 
    return x; 
    }) 
    .filter(x => x.IdentityPoolName === event.identityPoolName) 
    .map(x => { 
    if (x.IdentityPoolName === event.identityPoolName) { 
     var dataArr = [x.IdentityPoolName, x.IdentityPoolId]; 
     return dataArr; 
    } 
    }) 
    .defaultIfEmpty(false); 

    const nextTokenSource$ = source$ 
    .filter(x => x.NextToken != null || undefined) 
    .map(x => { 
     if (x.NextToken != null || undefined) { 
     return x.NextToken; 
     } 
    }) 
    .defaultIfEmpty(false); 

    var identityAndToken = Rx.Observable 
    .forkJoin(identityPoolsSource$, nextTokenSource$) 
    .subscribe(x => { 
     //conditional statements... 
     callback(null, eventResult); 
    }); 

    function ExceptionExistence(eventResult) { 
    this.name = eventResult.errorName; 
    this.errorReason = eventResult.errorReason; 
    }; 
    ExceptionExistence.prototype = new Error(); 

};