2011-10-10 91 views
16

我试图将F#中的简单异步TCP服务器移植到C#4中。服务器接收连接,读取单个请求并在关闭连接之前回传一系列响应。WCF性能,延迟和可伸缩性

在C#4中的异步看起来乏味和容易出错,所以我想我会尝试使用WCF来代替。这台服务器不大可能在野外看到1,000个同时发生的请求,所以我认为吞吐量和延迟是有趣的。

我在C#中编写了一个最小双工WCF Web服务和控制台客户端。尽管我使用的是WCF而不是原始套接字,但这已经是175行代码,而原始代码只有80行。但我更关心的性能和可扩展性:

  • 延迟是154 ×与WCF更糟。
  • 吞吐量为54 ×与WCF差。
  • TCP处理1000个并发连接,很容易,但WCF扼流圈只是20

首先,我使用的是默认设置一切,所以我想知道如果有什么我可以调整,以改善这些性能数据?

其次,我想知道是否有人使用WCF这种事情,或者如果它是这个工作的错误工具?

下面是在C#中我的WCF服务器:

IService1.cs

[DataContract] 
public class Stock 
{ 
    [DataMember] 
    public DateTime FirstDealDate { get; set; } 
    [DataMember] 
    public DateTime LastDealDate { get; set; } 
    [DataMember] 
    public DateTime StartDate { get; set; } 
    [DataMember] 
    public DateTime EndDate { get; set; } 
    [DataMember] 
    public decimal Open { get; set; } 
    [DataMember] 
    public decimal High { get; set; } 
    [DataMember] 
    public decimal Low { get; set; } 
    [DataMember] 
    public decimal Close { get; set; } 
    [DataMember] 
    public decimal VolumeWeightedPrice { get; set; } 
    [DataMember] 
    public decimal TotalQuantity { get; set; } 
} 

[ServiceContract(CallbackContract = typeof(IPutStock))] 
public interface IStock 
{ 
    [OperationContract] 
    void GetStocks(); 
} 

public interface IPutStock 
{ 
    [OperationContract] 
    void PutStock(Stock stock); 
} 

Service1.svc

<%@ ServiceHost Language="C#" Debug="true" Service="DuplexWcfService2.Stocks" CodeBehind="Service1.svc.cs" %> 

Service1.svc.cs

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Multiple)] 
public class Stocks : IStock 
{ 
    IPutStock callback; 

    #region IStock Members 
    public void GetStocks() 
    { 
    callback = OperationContext.Current.GetCallbackChannel<IPutStock>(); 
    Stock st = null; 
    st = new Stock 
    { 
     FirstDealDate = System.DateTime.Now, 
     LastDealDate = System.DateTime.Now, 
     StartDate = System.DateTime.Now, 
     EndDate = System.DateTime.Now, 
     Open = 495, 
     High = 495, 
     Low = 495, 
     Close = 495, 
     VolumeWeightedPrice = 495, 
     TotalQuantity = 495 
    }; 
    for (int i=0; i<1000; ++i) 
     callback.PutStock(st); 
    } 
    #endregion 
} 

Web.config

<?xml version="1.0"?> 
<configuration> 
    <system.web> 
    <compilation debug="true" targetFramework="4.0" /> 
    </system.web> 
    <system.serviceModel> 
    <services> 
     <service name="DuplexWcfService2.Stocks"> 
     <endpoint address="" binding="wsDualHttpBinding" contract="DuplexWcfService2.IStock"> 
      <identity> 
      <dns value="localhost"/> 
      </identity> 
     </endpoint> 
     <endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange"/> 
     </service> 
    </services> 
    <behaviors> 
     <serviceBehaviors> 
     <behavior> 
      <serviceMetadata httpGetEnabled="true"/> 
      <serviceDebug includeExceptionDetailInFaults="true"/> 
     </behavior> 
     </serviceBehaviors> 
    </behaviors> 
    <serviceHostingEnvironment multipleSiteBindingsEnabled="true" /> 
    </system.serviceModel> 
    <system.webServer> 
    <modules runAllManagedModulesForAllRequests="true"/> 
    </system.webServer> 
</configuration> 

这里是C#WCF客户端:

Program.cs

[CallbackBehavior(ConcurrencyMode = ConcurrencyMode.Multiple, UseSynchronizationContext = false)] 
class Callback : DuplexWcfService2.IStockCallback 
{ 
    System.Diagnostics.Stopwatch timer; 
    int n; 

    public Callback(System.Diagnostics.Stopwatch t) 
    { 
    timer = t; 
    n = 0; 
    } 

    public void PutStock(DuplexWcfService2.Stock st) 
    { 
    ++n; 
    if (n == 1) 
     Console.WriteLine("First result in " + this.timer.Elapsed.TotalSeconds + "s"); 
    if (n == 1000) 
     Console.WriteLine("1,000 results in " + this.timer.Elapsed.TotalSeconds + "s"); 
    } 
} 

class Program 
{ 
    static void Test(int i) 
    { 
    var timer = System.Diagnostics.Stopwatch.StartNew(); 
    var ctx = new InstanceContext(new Callback(timer)); 
    var proxy = new DuplexWcfService2.StockClient(ctx); 
    proxy.GetStocks(); 
    Console.WriteLine(i + " connected"); 
    } 

    static void Main(string[] args) 
    { 
    for (int i=0; i<10; ++i) 
    { 
     int j = i; 
     new System.Threading.Thread(() => Test(j)).Start(); 
    } 
    } 
} 

这里是我的异步TCP客户端和服务器端的代码在F#:

type AggregatedDeals = 
    { 
    FirstDealTime: System.DateTime 
    LastDealTime: System.DateTime 
    StartTime: System.DateTime 
    EndTime: System.DateTime 
    Open: decimal 
    High: decimal 
    Low: decimal 
    Close: decimal 
    VolumeWeightedPrice: decimal 
    TotalQuantity: decimal 
    } 

let read (stream: System.IO.Stream) = async { 
    let! header = stream.AsyncRead 4 
    let length = System.BitConverter.ToInt32(header, 0) 
    let! body = stream.AsyncRead length 
    let fmt = System.Runtime.Serialization.Formatters.Binary.BinaryFormatter() 
    use stream = new System.IO.MemoryStream(body) 
    return fmt.Deserialize(stream) 
} 

let write (stream: System.IO.Stream) value = async { 
    let body = 
    let fmt = System.Runtime.Serialization.Formatters.Binary.BinaryFormatter() 
    use stream = new System.IO.MemoryStream() 
    fmt.Serialize(stream, value) 
    stream.ToArray() 
    let header = System.BitConverter.GetBytes body.Length 
    do! stream.AsyncWrite header 
    do! stream.AsyncWrite body 
} 

let endPoint = System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 4502) 

let server() = async { 
    let listener = System.Net.Sockets.TcpListener(endPoint) 
    listener.Start() 
    while true do 
    let client = listener.AcceptTcpClient() 
    async { 
     use stream = client.GetStream() 
     let! _ = stream.AsyncRead 1 
     for i in 1..1000 do 
     let aggregatedDeals = 
      { 
      FirstDealTime = System.DateTime.Now 
      LastDealTime = System.DateTime.Now 
      StartTime = System.DateTime.Now 
      EndTime = System.DateTime.Now 
      Open = 1m 
      High = 1m 
      Low = 1m 
      Close = 1m 
      VolumeWeightedPrice = 1m 
      TotalQuantity = 1m 
      } 
     do! write stream aggregatedDeals 
    } |> Async.Start 
} 

let client() = async { 
    let timer = System.Diagnostics.Stopwatch.StartNew() 
    use client = new System.Net.Sockets.TcpClient() 
    client.Connect endPoint 
    use stream = client.GetStream() 
    do! stream.AsyncWrite [|0uy|] 
    for i in 1..1000 do 
    let! _ = read stream 
    if i=1 then lock stdout (fun() -> 
     printfn "First result in %fs" timer.Elapsed.TotalSeconds) 
    lock stdout (fun() -> 
    printfn "1,000 results in %fs" timer.Elapsed.TotalSeconds) 
} 

do 
    server() |> Async.Start 
    seq { for i in 1..100 -> client() } 
    |> Async.Parallel 
    |> Async.RunSynchronously 
    |> ignore 
+4

我想尝试的第一件事就是切换WCF从wsdualhttp结合nettcp(禁用安全)的东西更具有可比性。 – Brian

+0

又见也许http://www.devproconnections.com/article/net-framework2/concurrency-and-throttling-configurations-for-wcf-services – Brian

+0

好问题。我有你的C#服务在本地运行,但我无法让F#编译进行性能比较。我从来没有读过F#之前...我需要添加什么来使它编译超越剪切和粘贴上面的代码? – ErnieL

回答

5

要回答你的第二个问题首先,与之相比,WCF总是会有开销原始插座。但它有一吨的功能(如安全,reliablity,互操作性,多种传输协议,跟踪等)相比,原始套接字,权衡是否接受你是根据你的情况。它看起来像你正在做一些金融交易应用程序和WCF可能是不适合你的情况(虽然我在金融行业是不是有经验的资格这一点)。

关于第一个问题,而不是托管在客户端的独立WCF服务,使客户端可以是服务本身,并使用netTCP如果可能的话结合双http绑定尝试。在服务行为中调整serviceThrottling元素中的属性。 .Net 4之前的默认值较低。

25

对于几乎所有的默认值,WCF都会选择非常安全的值。这遵循不让新手开发者自己拍摄的哲学。但是,如果您知道要更改的限制以及要使用的绑定,则可以获得合理的性能和缩放比例。

在我的核心i5-2400(四核,没有超线程,3.10 GHz)下面的解决方案将运行1000个客户端,每个客户端有1000个回调,平均总运行时间为20秒。这是20秒内1,000,000次WCF呼叫。

不幸的是我无法让你的F#程序运行直接比较。如果你在你的盒子上运行我的解决方案,你可以发表一些F#和C#WCF性能比较数字吗?


免责声明:以下旨在概念的证明。其中一些设置对生产没有意义。

我做了什么:

  • 删除了双螺旋结合并有客户创造他们自己的 服务主机接收回调。这实质上是双面绑定在底层所做的事情。 (这也是Pratik的 建议)
  • 更改绑定到netTcpBinding。
  • 更改限制值:
    • WCF:maxConcurrentCalls,maxConcurrentSessions, maxConcurrentInstances所有1000
    • TCP binding:MAXCONNECTIONS = 1000
    • 线程池:最小工作线程= 1000,最小的IO线程= 2000
  • 添加了IsOneWay来服务操作

请注意,在此原型中,所有服务和客户端都位于同一App域中并共享相同的线程池。

我的教训:

  • 当有一个“无连接可以作出,因为目标机器积极地拒绝它”客户端异常
    • 可能的原因:
      1. WCF限制了已达到
      2. TCP限制已达到
      3. 没有I/O广告可用于处理该通话。
    • 为#3的解决方案是要么:
      1. 增加分钟IO线程数 - 或 -
      2. 拥有的StockService做了一个工作线程回调(这确实增加总运行时间)
  • 添加IsOneWay将运行时间减半(从40秒到20秒)。

运行在核心i5-2400上的程序输出。 注意定时器的使用与原始问题中的不同(请参阅代码)。

All client hosts open. 
Service Host opened. Starting timer... 
Press ENTER to close the host one you see 'ALL DONE'. 
Client #100 completed 1,000 results in 0.0542168 s 
Client #200 completed 1,000 results in 0.0794684 s 
Client #300 completed 1,000 results in 0.0673078 s 
Client #400 completed 1,000 results in 0.0527753 s 
Client #500 completed 1,000 results in 0.0581796 s 
Client #600 completed 1,000 results in 0.0770291 s 
Client #700 completed 1,000 results in 0.0681298 s 
Client #800 completed 1,000 results in 0.0649353 s 
Client #900 completed 1,000 results in 0.0714947 s 
Client #1000 completed 1,000 results in 0.0450857 s 
ALL DONE. Total number of clients: 1000 Total runtime: 19323 msec 

代码都在一个控制台应用程序文件:

using System; 
using System.Collections.Generic; 
using System.ServiceModel; 
using System.Diagnostics; 
using System.Threading; 
using System.Runtime.Serialization; 

namespace StockApp 
{ 
    [DataContract] 
    public class Stock 
    { 
     [DataMember] 
     public DateTime FirstDealDate { get; set; } 
     [DataMember] 
     public DateTime LastDealDate { get; set; } 
     [DataMember] 
     public DateTime StartDate { get; set; } 
     [DataMember] 
     public DateTime EndDate { get; set; } 
     [DataMember] 
     public decimal Open { get; set; } 
     [DataMember] 
     public decimal High { get; set; } 
     [DataMember] 
     public decimal Low { get; set; } 
     [DataMember] 
     public decimal Close { get; set; } 
     [DataMember] 
     public decimal VolumeWeightedPrice { get; set; } 
     [DataMember] 
     public decimal TotalQuantity { get; set; } 
    } 

    [ServiceContract] 
    public interface IStock 
    { 
     [OperationContract(IsOneWay = true)] 
     void GetStocks(string address); 
    } 

    [ServiceContract] 
    public interface IPutStock 
    { 
     [OperationContract(IsOneWay = true)] 
     void PutStock(Stock stock); 
    } 

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] 
    public class StocksService : IStock 
    { 
     public void SendStocks(object obj) 
     { 
      string address = (string)obj; 
      ChannelFactory<IPutStock> factory = new ChannelFactory<IPutStock>("CallbackClientEndpoint"); 
      IPutStock callback = factory.CreateChannel(new EndpointAddress(address)); 

      Stock st = null; st = new Stock 
      { 
       FirstDealDate = System.DateTime.Now, 
       LastDealDate = System.DateTime.Now, 
       StartDate = System.DateTime.Now, 
       EndDate = System.DateTime.Now, 
       Open = 495, 
       High = 495, 
       Low = 495, 
       Close = 495, 
       VolumeWeightedPrice = 495, 
       TotalQuantity = 495 
      }; 

      for (int i = 0; i < 1000; ++i) 
       callback.PutStock(st); 

      //Console.WriteLine("Done calling {0}", address); 

      ((ICommunicationObject)callback).Shutdown(); 
      factory.Shutdown(); 
     } 

     public void GetStocks(string address) 
     { 
      /// WCF service methods execute on IO threads. 
      /// Passing work off to worker thread improves service responsiveness... with a measurable cost in total runtime. 
      System.Threading.ThreadPool.QueueUserWorkItem(new System.Threading.WaitCallback(SendStocks), address); 

      // SendStocks(address); 
     } 
    } 

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerSession)] 
    public class Callback : IPutStock 
    { 
     public static int CallbacksCompleted = 0; 
     System.Diagnostics.Stopwatch timer = Stopwatch.StartNew(); 
     int n = 0; 

     public void PutStock(Stock st) 
     { 
      ++n; 
      if (n == 1000) 
      { 
       //Console.WriteLine("1,000 results in " + this.timer.Elapsed.TotalSeconds + "s"); 

       int compelted = Interlocked.Increment(ref CallbacksCompleted); 
       if (compelted % 100 == 0) 
       { 
        Console.WriteLine("Client #{0} completed 1,000 results in {1} s", compelted, this.timer.Elapsed.TotalSeconds); 

        if (compelted == Program.CLIENT_COUNT) 
        { 
         Console.WriteLine("ALL DONE. Total number of clients: {0} Total runtime: {1} msec", Program.CLIENT_COUNT, Program.ProgramTimer.ElapsedMilliseconds); 
        } 
       } 
      } 
     } 
    } 

    class Program 
    { 
     public const int CLIENT_COUNT = 1000;   // TEST WITH DIFFERENT VALUES 

     public static System.Diagnostics.Stopwatch ProgramTimer; 

     static void StartCallPool(object uriObj) 
     { 
      string callbackUri = (string)uriObj; 
      ChannelFactory<IStock> factory = new ChannelFactory<IStock>("StockClientEndpoint"); 
      IStock proxy = factory.CreateChannel(); 

      proxy.GetStocks(callbackUri); 

      ((ICommunicationObject)proxy).Shutdown(); 
      factory.Shutdown(); 
     } 

     static void Test() 
     { 
      ThreadPool.SetMinThreads(CLIENT_COUNT, CLIENT_COUNT * 2); 

      // Create all the hosts that will recieve call backs. 
      List<ServiceHost> callBackHosts = new List<ServiceHost>(); 
      for (int i = 0; i < CLIENT_COUNT; ++i) 
      { 
       string port = string.Format("{0}", i).PadLeft(3, '0'); 
       string baseAddress = "net.tcp://localhost:7" + port + "/"; 
       ServiceHost callbackHost = new ServiceHost(typeof(Callback), new Uri[] { new Uri(baseAddress)}); 
       callbackHost.Open(); 
       callBackHosts.Add(callbackHost);    
      } 
      Console.WriteLine("All client hosts open."); 

      ServiceHost stockHost = new ServiceHost(typeof(StocksService)); 
      stockHost.Open(); 

      Console.WriteLine("Service Host opened. Starting timer..."); 
      ProgramTimer = Stopwatch.StartNew(); 

      foreach (var callbackHost in callBackHosts) 
      { 
       ThreadPool.QueueUserWorkItem(new WaitCallback(StartCallPool), callbackHost.BaseAddresses[0].AbsoluteUri); 
      } 

      Console.WriteLine("Press ENTER to close the host once you see 'ALL DONE'."); 
      Console.ReadLine(); 

      foreach (var h in callBackHosts) 
       h.Shutdown(); 
      stockHost.Shutdown(); 
     } 

     static void Main(string[] args) 
     { 
      Test(); 
     } 
    } 

    public static class Extensions 
    { 
     static public void Shutdown(this ICommunicationObject obj) 
     { 
      try 
      { 
       obj.Close(); 
      } 
      catch (Exception ex) 
      { 
       Console.WriteLine("Shutdown exception: {0}", ex.Message); 
       obj.Abort(); 
      } 
     } 
    } 
} 

的app.config:

<?xml version="1.0" encoding="utf-8" ?> 
<configuration> 
    <system.serviceModel> 
    <services> 
     <service name="StockApp.StocksService"> 
     <host> 
      <baseAddresses> 
      <add baseAddress="net.tcp://localhost:8123/StockApp/"/> 
      </baseAddresses> 
     </host> 
     <endpoint address="" binding="netTcpBinding" bindingConfiguration="tcpConfig" contract="StockApp.IStock"> 
      <identity> 
      <dns value="localhost"/> 
      </identity> 
     </endpoint> 
     </service> 

     <service name="StockApp.Callback"> 
     <host> 
      <baseAddresses> 
      <!-- Base address defined at runtime. --> 
      </baseAddresses> 
     </host> 
     <endpoint address="" binding="netTcpBinding" bindingConfiguration="tcpConfig" contract="StockApp.IPutStock"> 
      <identity> 
      <dns value="localhost"/> 
      </identity> 
     </endpoint> 
     </service> 
    </services> 

    <client> 
     <endpoint name="StockClientEndpoint" 
       address="net.tcp://localhost:8123/StockApp/" 
           binding="netTcpBinding" 
       bindingConfiguration="tcpConfig" 
           contract="StockApp.IStock" > 
     </endpoint> 

     <!-- CallbackClientEndpoint address defined at runtime. --> 
     <endpoint name="CallbackClientEndpoint" 
       binding="netTcpBinding" 
       bindingConfiguration="tcpConfig" 
       contract="StockApp.IPutStock" > 
     </endpoint> 
    </client> 

    <behaviors> 
     <serviceBehaviors> 
     <behavior> 
      <!--<serviceMetadata httpGetEnabled="true"/>--> 
      <serviceDebug includeExceptionDetailInFaults="true"/> 
      <serviceThrottling maxConcurrentCalls="1000" maxConcurrentSessions="1000" maxConcurrentInstances="1000" /> 
     </behavior> 
     </serviceBehaviors> 
    </behaviors> 

    <bindings> 
     <netTcpBinding> 
     <binding name="tcpConfig" listenBacklog="100" maxConnections="1000"> 
      <security mode="None"/> 
      <reliableSession enabled="false" /> 
     </binding> 
     </netTcpBinding> 
    </bindings> 
    </system.serviceModel> 
</configuration> 

更新: 我只是试图用netNamedPipeBinding上述溶液:

<netNamedPipeBinding > 
    <binding name="pipeConfig" maxConnections="1000" > 
     <security mode="None"/> 
    </binding> 
    </netNamedPipeBinding> 

它实际上得到3秒慢(从20至23秒)。由于这个特定的例子都是进程间的,我不知道为什么。如果有人有一些见解,请评论。

2

我会说这取决于你的目标。如果你想尽可能地推动你的硬件,那么很容易获得10,000+个连接的客户端,秘密是尽量减少在垃圾收集器中花费的时间并且有效地使用套接字。

我在F#这里接几个帖子:http://moiraesoftware.com

Im做一些正在进行的工作有一个叫骨折-IO这里库:https://github.com/fractureio/fracture

您可能要检查这些出来的想法.. 。