如果Http连接失败(例如服务器关闭),我需要递减计数器。
我打算说“地狱是的”,但我在这句话后有点不太确定。我想你想要做这样的事情:
def sendRequest(url)
request = new request to url
request.header["X-count"] = next serial
if request.send() != SUCCESS
rewind serial
在这种情况下,我猜想,两个线程不应该被允许同时发送请求,然后你想要的东西,序列化的要求,而不是一个AtomicInteger
,这实际上只是让你执行一些原子操作。如果两个线程都同时调用sendRequest
,而第一个会失败,会发生这种情况:
Thread | What happens?
--------+-------------------------
A | Creates new request
B | Creates new request
A | Set request["X-count"] = 0
A | Increment counter to 1
A | Send request
B | Set request["X-count"] = 1
B | Increment counter to 2
B | Send request
A | Request fails
B | Request succeeds
A | Rewind counter down to 1
C | Creates new request
C | Set request["X-count"] = 1
C | Increment counter to 2
而现在,你已经派了两个请求,与X-数= 1。如果你想避免这种情况,你应该使用类似(假设Request
和Response
是用于处理请求的URL类别):
class SerialRequester {
private volatile int currentSerial = 0;
public synchronized Response sendRequest(URL url) throws SomeException {
Request request = new Request(url);
request.setHeader("X-count", currentSerial);
Response response = request.send();
if (response.isSuccess()) ++currentSerial;
return response;
}
}
此类保证没有两个成功的请求(通过相同SerialRequester
制造)具有相同的X-计数值。
编辑很多人似乎都担心上述解决方案不能同时运行。它没有。这是正确的。但它需要以这种方式来解决OP的问题。现在,如果在请求失败时不需要减少计数器,那么AtomicInteger
就是完美的,但在这种情况下它不正确。
编辑2我在我这里写了一个不容易冻结的串行请求者(比如上面的那个),这样如果它们挂起太久,它会中止请求(例如,在工作线程中排队但没有开始)。因此,如果管道阻塞并且一个请求挂起很长时间,其他请求将最多等待一段固定时间,所以队列不会无限增长,直到阻塞消失。
class SerialRequester {
private enum State { PENDING, STARTED, ABORTED }
private final ExecutorService executor =
Executors.newSingleThreadExecutor();
private int currentSerial = 0; // not volatile, used from executor thread only
public Response sendRequest(final URL url)
throws SomeException, InterruptedException {
final AtomicReference<State> state =
new AtomicReference<State>(State.PENDING);
Future<Response> result = executor.submit(new Callable<Response>(){
@Override
public Result call() throws SomeException {
if (!state.compareAndSet(State.PENDING, State.STARTED))
return null; // Aborted by calling thread
Request request = new Request(url);
request.setHeader("X-count", currentSerial);
Response response = request.send();
if (response.isSuccess()) ++currentSerial;
return response;
}
});
try {
try {
// Wait at most 30 secs for request to start
return result.get(30, TimeUnit.SECONDS);
} catch (TimeoutException e){
// 30 secs passed; abort task if not started
if (state.compareAndSet(State.PENDING, State.ABORTED))
throw new SomeException("Request queued too long", e);
return result.get(); // Task started; wait for completion
}
} catch (ExecutionException e) { // Network timeout, misc I/O errors etc
throw new SomeException("Request error", e);
}
}
}
但是我的2克拉,你应该注意,其中具有较高计数的HTTP请求是一个与之前的较低值发送的情况下(如情况下,其中的低线计数HTTP请求结束睡眠) – notnoop 2009-11-04 07:29:42
notnoop是正确的。如果你想在服务器端进行预定义排序(例如,发生 - 之前),你必须实现某种Lamport时钟。 – 2009-11-04 15:40:12