出现这个问题的原因是Akka stream中的流处理是异步的,可能会被快速的流淹没而漏掉CSV文件中的某些行。
解决方法是采用以下的代码:
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, Materializer}
import akka.util.ByteString
import scala.concurrent.Future
def readCSV(path: String)(implicit mat: Materializer): Future[Seq[String]] = {
val fileSource = Source.fromIterator(() => scala.io.Source.fromFile(path).getLines())
val result: Future[Seq[String]] = fileSource.runFold(Seq.empty[String])((acc, elem) => acc :+ elem)
result
}
implicit val system = ActorSystem("CSVReader")
implicit val materializer = ActorMaterializer()
val csvPath = "path/to/your/file.csv"
val lines = readCSV(csvPath)(materializer)
lines.foreach(println)
这段代码以源迭代器开始,将其转化为 Akka stream 并且运行方法 runFold
以读取文件内容,并返回一个 Seq
类型的未来值。
读取文件后,可以使用 foreach
方法来处理每一行的数据。
上一篇:AkkaStream停止处理数据