2016-12-14 45 views
2

我正在尝试我的手在scala + Akka,我试图找出容错。我有一位演员从主管接收消息并将数据插入到数据库中。监督员在遇到故障时重新启动参与者。Akka容错方法

我正在更改postRestart()中的连接字符串,以防连接问题到达数据库。现在,只要有一个数据库存在连接问题,演员就会重新启动并开始将数据插入另一个数据库。

这是一个很好的方法吗?什么是推荐的方法?

主管:

class SocialSupervisor extends Actor { 

    override val supervisorStrategy=OneForOneStrategy(loggingEnabled = false){ 
    case (e:Exception)=>Restart 
    } 

    val post_ref=context.actorOf(Props[Post]) 
    def receive={ 
      case Get_Feed(feed)=>{ 
       //get data from feed 
       post_ref!Post_Message(posted_by,post) 
      } 
    } 
} 

演员:

class Post extends Actor{ 
    val config1=ConfigFactory.load() 
    var config=config1.getConfig("MyApp.db") 

    override def postRestart(reason: Throwable) { 
     config=config1.getConfig("MyApp.backup_db") 
     super.postRestart(reason) 
    } 

    def insert_data(commented_by:String,comment:String){ 
      val connection_string=config.getString("url") 
       val username=config.getString("username") 
       val password=config.getString("password") 
       //DB operations 
    } 

    def receive={ 
     case Post_Message(posted_by,message)=>{ 
     insert_data(posted_by, message) 
     } 
    } 
} 

回答

1

我觉得有几个改进,你可以使你的代码,使之更加 “容错”。

模块化

你或许应该从演员的其余部分,以便它可以被用来测试&独立于任何ActorSystem的分开你的insert_data功能。您的参与者应代码非常少,在他们和receive方法应该主要是调度员到外部函数:

object Post { 
    def insert_data(conn : Connection)(commented_by : String, comment : String) = { 
    ... 
    } 
} 

你甚至可以走了一步,取出Connection依赖。从演员的角度插入无非是发生在一个PostMessage,并返回有效行更新次数的函数更多:

object Post { 
    //returns an Int because Statement.executeUpdate returns an Int 
    type DBInserter : Post_Message => Int 

现在,您可以插入到数据库连接之前:

def insertIntoLiveDB(connFactory :() => Connection) : DBInserter = 
    (postMessage : Post_Message) => { 
     val sqlStr = s"INSERT INTO .." 
     connFactory().createStatement() executeUpdate sqlStr 
    } 
    } 

或者编写一个函数,从来不会插入用于测试目的:

//does no inserting 
    val neverInsert : DBInserter = (postMessage : Post_Message) => 0 
} 

现在你的演员几乎没有逻辑:

class Post(inserter : Post.DBInserter) extends Actor { 

    def receive = { 
    case pm : Post_Message => inserter(pm) 
    } 

} 

容错

到目前为止,“故障”的应用程序中的最大来源是网络,由Connection你的情况表现到数据库。我们需要某种方式让连接在发生故障时自动刷新。我们可以用一个工厂函数来进行:

def basicConnFactory(timeoutInSecs : Int = 10) = { 

    //setup initial connection, not specified in question 
    var conn : Connection = ??? 

() => { 
    if(conn isValid timeoutInSecs) 
     conn 
    else { 
     conn = ??? //setup backup connection 
     conn 
    } 
    } 
} 

现在连接的有效性在每次插入测试和重新确立,如果出现了问题。然后,该工厂可用于创建演员:

import Post.queryLiveDB 
val post_ref = 
    context actorOf (Props[Post], insertIntoLiveDB(basicConnFactory())) 

当你的生产要求得到更加严格,你可以ammend工厂利用connection pool ...