2012-04-15 56 views
1

我很新的扭曲,我试图使用试用测试框架编写一些单元测试。我的测试按预期运行并通过,但由于某些原因,测试之间会进行审判。我必须在每次测试后打到CTRL + C以使其进入下一个测试。我猜我有一些配置不正确,或者我没有调用某种方法,我应该告诉试验测试完成。扭曲的试验在测试之间挂起

下面是测试类:

from twisted.internet import reactor, defer 
import threading 
import time 


class SomeClass: 
    def doSomething(self): 
     return self.asyncMethod() 

    def asyncMethod(self): 
     d = defer.Deferred() 
     t = SomeThread(d) 
     t.start() 
     return d 


class SomeThread(threading.Thread): 
    def __init__(self, d): 
     super(SomeThread, self).__init__() 
     self.d = d 

    def run(self): 
     time.sleep(2) # pretend to do something 
     retVal = 123 
     self.d.callback(retVal) 

下面是单元测试类:

from twisted.trial import unittest 
import tested 


class SomeTest(unittest.TestCase): 
    def testOne(self): 
     sc = tested.SomeClass() 
     d = sc.doSomething() 
     return d.addCallback(self.allDone) 

    def allDone(self, retVal): 
     self.assertEquals(retVal, 123) 

    def testTwo(self): 
     sc = tested.SomeClass() 
     d = sc.doSomething() 
     return d.addCallback(self.allDone2) 

    def allDone2(self, retVal): 
     self.assertEquals(retVal, 123) 

这是命令行输出的样子:

me$ trial test.py 
test 
    SomeTest 
    testOne ... ^C               [OK] 
    testTwo ... ^C               [OK] 

------------------------------------------------------------------------------- 
Ran 2 tests in 8.499s 

PASSED (successes=2) 
+1

你为什么使用'threading'?你可能应该使用['deferToThread'](http://twistedmatrix.com/documents/current/api/twisted.internet.threads.deferToThread.html) – SingleNegationElimination 2012-04-15 03:29:50

+0

这个问题似乎与睡眠有关线程。如果我离开穿线并将其睡觉,它就会起作用。我也通过完全取出线程并使用reactor.callLater()来代替它。任何人都可以解释为什么线程会导致此问题?我只是想模拟像数据库查询这样的长操作。 – d512 2012-04-15 05:26:19

+0

我尝试使用deferToThread(),那也工作得很好。感谢指针。 – d512 2012-04-15 05:34:46

回答

1

猜想你的问题与你的线程有关。扭曲不是线程安全的,如果你需要与线程交互,你应该让反应堆通过使用deferToThread,callInThread,callFromThread来处理事情。 有关如何使用Twisted进行线程安全的信息,请参阅here