2009-08-12 88 views
1

我遇到了一个问题,即从HttpResponseStream读取失败,因为我正在打包的StreamReader读取速度更快,响应流获取实际响应。我正在检索一个相当小的文件(大约60k),但将响应处理为实际对象的解析器失败,因为它遇到了一个意外的字符(代码65535),根据经验,我知道这是从您阅读StreamReader并没有可用的其他字符。从HttpResponseStream读取失败

对于记录,我知道返回的内容是有效的,并且会正确解析,因为每次运行代码时文件中的不同位置发生故障。这是parser.Load()行,它在下面失败。

有没有一种方法可以确保我在尝试解析它之前已经读取了所有内容:将响应流复制到MemoryStream或字符串中,然后处理它?

/// <summary> 
    /// Makes a Query where the expected Result is an RDF Graph ie. CONSTRUCT and DESCRIBE Queries 
    /// </summary> 
    /// <param name="sparqlQuery">SPARQL Query String</param> 
    /// <returns>RDF Graph</returns> 
    public Graph QueryWithResultGraph(String sparqlQuery) 
    { 
     try 
     { 
      //Build the Query URI 
      StringBuilder queryUri = new StringBuilder(); 
      queryUri.Append(this._endpoint.ToString()); 
      queryUri.Append("?query="); 
      queryUri.Append(Uri.EscapeDataString(sparqlQuery)); 

      if (!this._defaultGraphUri.Equals(String.Empty)) 
      { 
       queryUri.Append("&default-graph-uri="); 
       queryUri.Append(Uri.EscapeUriString(this._defaultGraphUri)); 
      } 

      //Make the Query via HTTP 
      HttpWebResponse httpResponse = this.DoQuery(new Uri(queryUri.ToString()),false); 

      //Set up an Empty Graph ready 
      Graph g = new Graph(); 
      g.BaseURI = this._endpoint; 

      //Parse into a Graph based on Content Type 
      String ctype = httpResponse.ContentType; 
      IRDFReader parser = MIMETypesHelper.GetParser(ctype); 
      parser.Load(g, new StreamReader(httpResponse.GetResponseStream())); 

      return g; 
     } 
     catch (UriFormatException uriEx) 
     { 
      //URI Format Invalid 
      throw new Exception("The format of the URI was invalid", uriEx); 
     } 
     catch (WebException webEx) 
     { 
      //Some sort of HTTP Error occurred 
      throw new Exception("A HTTP Error occurred", webEx); 
     } 
     catch (RDFException) 
     { 
      //Some problem with the RDF or Parsing thereof 
      throw; 
     } 
     catch (Exception) 
     { 
      //Other Exception 
      throw; 
     } 
    } 

    /// <summary> 
    /// Internal Helper Method which executes the HTTP Requests against the SPARQL Endpoint 
    /// </summary> 
    /// <param name="target">URI to make Request to</param> 
    /// <param name="sparqlOnly">Indicates if only SPARQL Result Sets should be accepted</param> 
    /// <returns>HTTP Response</returns> 
    private HttpWebResponse DoQuery(Uri target, bool sparqlOnly) 
    { 
     //Expect errors in this function to be handled by the calling function 

     //Set-up the Request 
     HttpWebRequest httpRequest; 
     HttpWebResponse httpResponse; 
     httpRequest = (HttpWebRequest)WebRequest.Create(target); 

     //Use HTTP GET/POST according to user set preference 
     if (!sparqlOnly) 
     { 
      httpRequest.Accept = MIMETypesHelper.HTTPAcceptHeader(); 
      //For the time being drop the application/json as this doesn't play nice with Virtuoso 
      httpRequest.Accept = httpRequest.Accept.Replace("," + MIMETypesHelper.JSON[0], String.Empty); 
     } 
     else 
     { 
      httpRequest.Accept = MIMETypesHelper.HTTPSPARQLAcceptHeader(); 
     } 
     httpRequest.Method = this._httpMode; 
     httpRequest.Timeout = this._timeout; 

     //HTTP Debugging 
     if (Options.HTTPDebugging) 
     { 
      Tools.HTTPDebugRequest(httpRequest); 
     } 

     httpResponse = (HttpWebResponse)httpRequest.GetResponse(); 

     //HTTP Debugging 
     if (Options.HTTPDebugging) 
     { 
      Tools.HTTPDebugResponse(httpResponse); 
     } 

     return httpResponse; 
    } 

编辑

为了澄清什么,我已经说过这是在分析器中的错误,这是StreamReader的读取速度比响应流中提供数据的问题。我可以解决这个问题通过执行以下操作,但想更好或更优雅的解决方案的建议:

  //Parse into a Graph based on Content Type 
      String ctype = httpResponse.ContentType; 
      IRDFReader parser = MIMETypesHelper.GetParser(ctype); 
      Stream response = httpResponse.GetResponseStream(); 
      MemoryStream temp = new MemoryStream(); 
      Tools.StreamCopy(response, temp); 
      response.Close(); 
      temp.Seek(0, SeekOrigin.Begin); 
      parser.Load(g, new StreamReader(temp)); 

编辑2

BlockingStreamReader类按埃蒙的建议:

/// <summary> 
/// A wrapper to a Stream which does all its Read() and Peek() calls using ReadBlock() to handle slow underlying streams (eg Network Streams) 
/// </summary> 
public sealed class BlockingStreamReader : StreamReader 
{ 
    private bool _peeked = false; 
    private int _peekChar = -1; 

    public BlockingStreamReader(StreamReader reader) : base(reader.BaseStream) { } 

    public BlockingStreamReader(Stream stream) : base(stream) { } 

    public override int Read() 
    { 
     if (this._peeked) 
     { 
      this._peeked = false; 
      return this._peekChar; 
     } 
     else 
     { 
      if (this.EndOfStream) return -1; 

      char[] cs = new char[1]; 
      base.ReadBlock(cs, 0, 1); 

      return cs[0]; 
     } 
    } 

    public override int Peek() 
    { 
     if (this._peeked) 
     { 
      return this._peekChar; 
     } 
     else 
     { 
      if (this.EndOfStream) return -1; 

      this._peeked = true; 

      char[] cs = new char[1]; 
      base.ReadBlock(cs, 0, 1); 

      this._peekChar = cs[0]; 
      return this._peekChar; 
     } 
    } 

    public new bool EndOfStream 
    { 
     get 
     { 
      return (base.EndOfStream && !this._peeked); 
     } 
    } 
} 

编辑3

这是一个很好的解决方案n可以包装任何TextReader并提供EndOfStream属性。它使用内部缓冲区,在包装的TextReader上使用ReadBlock()填充。所有阅读()读者的方法可以在使用此缓冲区定义,缓冲区大小是可配置的:

/// <summary> 
/// The BlockingTextReader is an implementation of a <see cref="TextReader">TextReader</see> designed to wrap other readers which may or may not have high latency. 
/// </summary> 
/// <remarks> 
/// <para> 
/// This is designed to avoid premature detection of end of input when the input has high latency and the consumer tries to read from the input faster than it can return data. All methods are defined by using an internal buffer which is filled using the <see cref="TextReader.ReadBlock">ReadBlock()</see> method of the underlying <see cref="TextReader">TextReader</see> 
/// </para> 
/// </remarks> 
public sealed class BlockingTextReader : TextReader 
{ 
    private char[] _buffer; 
    private int _pos = -1; 
    private int _bufferAmount = -1; 
    private bool _finished = false; 
    private TextReader _reader; 

    public const int DefaultBufferSize = 1024; 

    public BlockingTextReader(TextReader reader, int bufferSize) 
    { 
     if (reader == null) throw new ArgumentNullException("reader", "Cannot read from a null TextReader"); 
     if (bufferSize < 1) throw new ArgumentException("bufferSize must be >= 1", "bufferSize"); 
     this._reader = reader; 
     this._buffer = new char[bufferSize]; 
    } 

    public BlockingTextReader(TextReader reader) 
     : this(reader, DefaultBufferSize) { } 

    public BlockingTextReader(Stream input, int bufferSize) 
     : this(new StreamReader(input), bufferSize) { } 

    public BlockingTextReader(Stream input) 
     : this(new StreamReader(input)) { } 

    private void FillBuffer() 
    { 
     this._pos = -1; 
     if (this._finished) 
     { 
      this._bufferAmount = 0; 
     } 
     else 
     { 
      this._bufferAmount = this._reader.ReadBlock(this._buffer, 0, this._buffer.Length); 
      if (this._bufferAmount == 0 || this._bufferAmount < this._buffer.Length) this._finished = true; 
     } 
    } 

    public override int ReadBlock(char[] buffer, int index, int count) 
    { 
     if (count == 0) return 0; 
     if (buffer == null) throw new ArgumentNullException("buffer"); 
     if (index < 0) throw new ArgumentException("index", "Index must be >= 0"); 
     if (count < 0) throw new ArgumentException("count", "Count must be >= 0"); 
     if ((buffer.Length - index) < count) throw new ArgumentException("Buffer too small"); 

     if (this._bufferAmount == -1 || this._pos >= this._bufferAmount) 
     { 
      if (!this._finished) 
      { 
       this.FillBuffer(); 
       if (this.EndOfStream) return 0; 
      } 
      else 
      { 
       return 0; 
      } 
     } 

     this._pos = Math.Max(0, this._pos); 
     if (count <= this._bufferAmount - this._pos) 
     { 
      //If we have sufficient things buffered to fufill the request just copy the relevant stuff across 
      Array.Copy(this._buffer, this._pos, buffer, index, count); 
      this._pos += count; 
      return count; 
     } 
     else 
     { 
      int copied = 0; 
      while (copied < count) 
      { 
       int available = this._bufferAmount - this._pos; 
       if (count < copied + available) 
       { 
        //We can finish fufilling this request this round 
        int toCopy = Math.Min(available, count - copied); 
        Array.Copy(this._buffer, this._pos, buffer, index + copied, toCopy); 
        copied += toCopy; 
        this._pos += toCopy; 
        return copied; 
       } 
       else 
       { 
        //Copy everything we currently have available 
        Array.Copy(this._buffer, this._pos, buffer, index + copied, available); 
        copied += available; 
        this._pos = this._bufferAmount; 

        if (!this._finished) 
        { 
         //If we haven't reached the end of the input refill our buffer and continue 
         this.FillBuffer(); 
         if (this.EndOfStream) return copied; 
         this._pos = 0; 
        } 
        else 
        { 
         //Otherwise we have reached the end of the input so just return what we've managed to copy 
         return copied; 
        } 
       } 
      } 
      return copied; 
     } 
    } 

    public override int Read(char[] buffer, int index, int count) 
    { 
     return this.ReadBlock(buffer, index, count); 
    } 

    public override int Read() 
    { 
     if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1) 
     { 
      if (!this._finished) 
      { 
       this.FillBuffer(); 
       if (this.EndOfStream) return -1; 
      } 
      else 
      { 
       return -1; 
      } 
     } 

     this._pos++; 
     return (int)this._buffer[this._pos]; 
    } 

    public override int Peek() 
    { 
     if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1) 
     { 
      if (!this._finished) 
      { 
       this.FillBuffer(); 
       if (this.EndOfStream) return -1; 
      } 
      else 
      { 
       return -1; 
      } 
     } 

     return (int)this._buffer[this._pos + 1]; 
    } 

    public bool EndOfStream 
    { 
     get 
     { 
      return this._finished && (this._pos >= this._bufferAmount - 1); 
     } 
    } 

    public override void Close() 
    { 
     this._reader.Close(); 
    } 

    protected override void Dispose(bool disposing) 
    { 
     this.Close(); 
     this._reader.Dispose(); 
     base.Dispose(disposing); 
    } 
} 
+0

因此,在它推出九年后,你恰好是世界上第一个发现StreamReader的读取速度比它想要读取的Stream的速度更快的解决方案吗? – 2009-08-12 09:33:17

+0

不,我只是想知道是否有人有任何解决方案比上述更优雅 – RobV 2009-08-12 09:40:36

+0

解决方案是什么? StreamReader的读取速度不及Stream的速度。 – 2009-08-12 09:49:28

回答

1

不知道具体情况,你正在使用的解析器,我只能猜测错误,但有一个相当容易使错误的.NET框架I/O库几乎鼓励你...

你知道的事实,流和TextReaders可能会读取比请求更少的字节/字符?

特别地,TextReader.Read(烧焦[]缓冲液,INT指数,诠释计数)的文档说:

返回值

类型:System .. ::的Int32。

已读取的字符数。该数字将为小于或等于计数,这取决于数据是否在流内可用。如果在没有更多字符需要读取时调用此方法,则返回零。

强调我的。

例如,如果您打电话给reader.Read(缓冲区,0,100)不能假定已经读取了100个字符。

编辑:解析器很可能会假设这一点;和这解释了你观察到的行为:如果你完全缓存在一个MemoryStream中的流,总会有足够的字符满足请求 - 但是如果你不这样做,解析器会收到比在不可预知的时间请求更少的字符,基础流是“缓慢”的。

EDIT2:您可以通过在解析器与TextReader.ReadBlock更换TextReader.Read()的所有实例解决你的bug()。

+0

我意识到这一点,我不确定它是否需要计数作为StreamReader中的一个错误,看起来似乎是当底层流可能很慢时它的行为。解析器不是问题,如果我使用第二个代码片段(添加到原始问题),它在解析之前读取整个流,解析罚款 – RobV 2009-08-12 09:25:13

+0

这是_is_解析器中具有很高可能性的错误。在设计上,如果基础流是“慢”,则流读取器返回的字符数少于请求的字符数。使用内存流作为底层流会导致streamreader始终返回全部字符数 - 解决解析器中的错误。 – 2009-08-13 07:54:21

+0

解析器使用基本的tokenizer,它使用Read()方法逐字符读取,因此您很可能是正确的,我将测试ReadBlock()并接受您的答案,如果证明解决问题 – RobV 2009-08-13 13:09:10

0

为了支持阻塞读的情况,而不是继承StreamReader,你也可以继承TextReader:这避免了与EndOfStream问题,这意味着你可以任何读者拦截 - 不只是StreamReader S:

public sealed class BlockingReader : TextReader 
{ 
    bool hasPeeked; 
    int peekChar; 
    readonly TextReader reader; 

    public BlockingReader(TextReader reader) { this.reader = reader; } 

    public override int Read() 
    { 
     if (!hasPeeked) 
      return reader.Read(); 
     hasPeeked = false; 
     return peekChar; 
    } 

    public override int Peek() 
    { 
     if (!hasPeeked) 
     { 
      peekChar = reader.Read(); 
      hasPeeked = true; 
     } 
     return peekChar; 
    } 

    public override int Read(char[] buffer, int index, int count) 
    { 
     if (buffer == null) 
      throw new ArgumentNullException("buffer"); 
     if (index < 0) 
      throw new ArgumentOutOfRangeException("index"); 
     if (count < 0) 
      throw new ArgumentOutOfRangeException("count"); 
     if ((buffer.Length - index) < count) 
      throw new ArgumentException("Buffer too small"); 

     int peekCharsRead = 0; 
     if (hasPeeked) 
     { 
      buffer[index] = (char)peekChar; 
      hasPeeked = false; 
      index++; 
      count--; 
      peekCharsRead++; 
     } 

     return peekCharsRead + reader.ReadBlock(buffer, index, count); 
    } 

    protected override void Dispose(bool disposing) 
    { 
     try 
     { 
      if (disposing) 
       reader.Dispose(); 
     } 
     finally 
     { 
      base.Dispose(disposing); 
     } 
    } 
}