6

首先让我说我是 Scala 新手;但是,我发现基于 Actor 的并发模型很有趣,因此我尝试将它用于一个相对简单的应用程序。我遇到的问题是,虽然我能够让应用程序工作,但结果远低于基于 Java 的等效解决方案的效率(在实时、CPU 时间和内存使用方面)使用从 ArrayBlockingQueue 中提取消息的线程。我想了解为什么。我怀疑这可能是我缺乏 Scala 知识,并且我导致了所有的低效率,但是在多次尝试重新设计应用程序但没有成功之后,我决定向社区寻求帮助。

我的问题是:我有一个 gzip 压缩文件,其中包含以下格式的多行:

SomeID comma_separated_list_of_values

例如:

1234 12,45,82

我想解析每一行并获得逗号分隔列表中每个值出现次数的总计数。

该文件可能非常大(压缩了几 GB),但每个文件的唯一值数量非常少(最多 500 个)。我认为这将是一个尝试编写基于 Actor 的并发 Scala 应用程序的好机会。我的解决方案涉及创建解析器 Actor 池的主驱动程序。然后,主驱动程序从标准输入读取行,将行传递给解析行并保持值的本地计数的 Actor。当主驱动程序读完最后一行时,它会向每个参与者传递一条消息,指示所有行都已读完。当参与者收到“完成”消息时,他们将他们的计数传递给一个聚合器,该聚合器将所有参与者的计数相加。汇总所有解析器的计数后,主驱动程序会打印出统计信息。

问题:我遇到的主要问题是这个应用程序的低效率令人难以置信。它比使用线程和 ArrayBlockingQueue 的“等效”Java 应用程序使用更多的 CPU 和更多的内存。为了正确看待这一点,以下是我为一个 1000 万行测试输入文件收集的一些统计数据:

Scala 1 Actor(解析器):

    real    9m22.297s
    user    235m31.070s
    sys     21m51.420s

Java 1 线程(解析器):

    real    1m48.275s
    user    1m58.630s
    sys     0m33.540s

Scala 5 演员:

    real    2m25.267s
    user    63m0.730s
    sys     3m17.950s

Java 5 线程:

    real    0m24.961s
    user    1m52.650s
    sys     0m20.920s

此外,top 报告说 Scala 应用程序的常驻内存大小大约是 10 倍。所以我们在这里谈论的是更多数量级的 CPU 和内存,而性能却要差几个数量级,我只是无法弄清楚是什么原因造成的。是 GC 问题,还是我以某种方式创建的对象副本比我意识到的要多得多?

可能重要或不重要的其他细节:

  • Scala 应用程序由 Java 类包装,因此我可以提供一个自包含的可执行 JAR 文件(我可能想要运行此应用程序的每台机器上都没有 Scala jar)。
  • 应用程序被调用如下: gunzip -c gzFilename | java -jar StatParser.jar

这是代码:

主驱动:

import scala.actors.Actor._
import scala.collection.{ immutable, mutable }
import scala.io.Source

class StatCollector (numParsers : Int ) {
    private val parsers = new mutable.ArrayBuffer[StatParser]()
    private val aggregator = new StatAggregator()

    def generateParsers {
        for ( i <- 1 to numParsers ) {
            val parser = new StatParser( i, aggregator )
            parser.start
            parsers += parser
        }
    }


    def readStdin {
        var nextParserIdx = 0
        var lineNo = 1
        for ( line <- Source.stdin.getLines() ) {
            parsers( nextParserIdx ) ! line
            nextParserIdx += 1
            if ( nextParserIdx >= numParsers ) {
                nextParserIdx = 0
            }
            lineNo += 1
        }
    }

    def informParsers {
        for ( parser <- parsers ) {
            parser ! true
        }
    }

    def printCounts {
        val countMap = aggregator.getCounts()
        println( "ID,Count" )
        /*
        for ( key <- countMap.keySet ) {
            println( key + "," + countMap.getOrElse( key, 0 ) )
            //println( "Campaign '" + key + "': " + countMap.getOrElse( key, 0 ) )
        }
        */
        countMap.toList.sorted foreach {
            case (key, value) =>
                println( key + "," + value )
        }
    }

    def processFromStdIn {
        aggregator.start

        generateParsers

        readStdin
        process
    }

    def process {

        informParsers

        var completedParserCount = aggregator.getNumParsersAggregated
        while ( completedParserCount < numParsers ) {
            Thread.sleep( 250 )
            completedParserCount = aggregator.getNumParsersAggregated
        }

        printCounts
    }
}

解析器演员:

import scala.actors.Actor
import collection.mutable.HashMap
import scala.util.matching

class StatParser( val id: Int, val aggregator: StatAggregator ) extends Actor {

    private var countMap = new HashMap[String, Int]()
    private val sep1 = "\t"
    private val sep2 = ","


    def getCounts(): HashMap[String, Int] = {
        return countMap
    }

    def act() {
        loop {
            react {
                case line: String =>
                    {
                        val idx = line.indexOf( sep1 )
                        var currentCount = 0
                        if ( idx > 0 ) {
                            val tokens = line.substring( idx + 1 ).split( sep2 )
                            for ( token <- tokens ) {
                                if ( !token.equals( "" ) ) {
                                    currentCount = countMap.getOrElse( token, 0 )
                                    countMap( token ) = ( 1 + currentCount )
                                }
                            }

                        }
                    }
                case doneProcessing: Boolean =>
                    {
                        if ( doneProcessing ) {
                            // Send my stats to Aggregator
                            aggregator ! this
                        }
                    }
            }
        }
    }
}

聚合器演员:

import scala.actors.Actor
import collection.mutable.HashMap

class StatAggregator extends Actor {
    private var countMap = new HashMap[String, Int]()
    private var parsersAggregated = 0

    def act() {
        loop {
            react {
                case parser: StatParser =>
                    {
                        val cm = parser.getCounts()
                        for ( key <- cm.keySet ) {
                            val currentCount = countMap.getOrElse( key, 0 )
                            val incAmt = cm.getOrElse( key, 0 )
                            countMap( key ) = ( currentCount + incAmt )
                        }
                        parsersAggregated += 1
                    }
            }
        }
    }

    def getNumParsersAggregated: Int = {
        return parsersAggregated
    }

    def getCounts(): HashMap[String, Int] = {
        return countMap
    }
}

在理解这里发生的事情方面可以提供的任何帮助将不胜感激。

提前致谢!

- - 编辑 - -

由于很多人回复并要求提供 Java 代码,因此这是我创建的简单 Java 应用程序,用于比较目的。我意识到这不是很好的 Java 代码,但是当我看到 Scala 应用程序的性能时,我只是快速地写了一些东西来看看基于 Java 线程的实现将如何作为基线执行:

解析线程:

import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class JStatParser extends Thread
{
    private ArrayBlockingQueue<String> queue;
    private Map<String, Integer> countMap;
    private boolean done;

    public JStatParser( ArrayBlockingQueue<String> q )
    {
        super( );
        queue = q;
        countMap = new Hashtable<String, Integer>( );
        done = false;
    }

    public Map<String, Integer> getCountMap( )
    {
        return countMap;
    }

    public void alldone( )
    {
        done = true;
    }

    @Override
    public void run( )
    {
        String line = null;
        while( !done || queue.size( ) > 0 )
        {
            try
            {
                // line = queue.take( );
                line = queue.poll( 100, TimeUnit.MILLISECONDS );
                if( line != null )
                {
                    int idx = line.indexOf( "\t" ) + 1;
                    for( String token : line.substring( idx ).split( "," ) )
                    {
                        if( !token.equals( "" ) )
                        {
                            if( countMap.containsKey( token ) )
                            {
                                Integer currentCount = countMap.get( token );
                                currentCount++;
                                countMap.put( token, currentCount );
                            }
                            else
                            {
                                countMap.put( token, new Integer( 1 ) );
                            }
                        }
                    }
                }
            }
            catch( InterruptedException e )
            {
                // TODO Auto-generated catch block
                System.err.println( "Failed to get something off the queue: "
                        + e.getMessage( ) );
                e.printStackTrace( );
            }
        }
    }
}

司机:

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;

public class JPS
{
    public static void main( String[] args )
    {
        if( args.length <= 0 || args.length > 2 || args[0].equals( "-?" ) )
        {
            System.err.println( "Usage: JPS [filename]" );
            System.exit( -1 );
        }

        int numParsers = Integer.parseInt( args[0] );
        ArrayBlockingQueue<String> q = new ArrayBlockingQueue<String>( 1000 );
        List<JStatParser> parsers = new ArrayList<JStatParser>( );

        BufferedReader reader = null;

        try
        {
            if( args.length == 2 )
            {
                reader = new BufferedReader( new FileReader( args[1] ) );
            }
            else
            {
                reader = new BufferedReader( new InputStreamReader( System.in ) );
            }

            for( int i = 0; i < numParsers; i++ )
            {
                JStatParser parser = new JStatParser( q );
                parser.start( );
                parsers.add( parser );
            }

            String line = null;
            while( (line = reader.readLine( )) != null )
            {
                try
                {
                    q.put( line );
                }
                catch( InterruptedException e )
                {
                    // TODO Auto-generated catch block
                    System.err.println( "Failed to add line to q: "
                            + e.getMessage( ) );
                    e.printStackTrace( );
                }
            }

            // At this point, we've put everything on the queue, now we just
            // need to wait for it to be processed.
            while( q.size( ) > 0 )
            {
                try
                {
                    Thread.sleep( 250 );
                }
                catch( InterruptedException e )
                {
                }
            }

            Map<String,Integer> countMap = new Hashtable<String,Integer>( );
            for( JStatParser jsp : parsers )
            {
                jsp.alldone( );
                Map<String,Integer> cm = jsp.getCountMap( );
                for( String key : cm.keySet( ) )
                {
                    if( countMap.containsKey( key ))
                    {
                        Integer currentCount = countMap.get(  key );
                        currentCount += cm.get( key );
                        countMap.put( key, currentCount );
                    }
                    else
                    {
                        countMap.put(  key, cm.get( key ) );
                    }
                }
            }

            System.out.println( "ID,Count" );
            for( String key : new TreeSet<String>(countMap.keySet( ))  )
            {
                System.out.println( key + "," + countMap.get( key ) );
            }

            for( JStatParser parser : parsers )
            {
                try
                {
                    parser.join( 100 );
                }
                catch( InterruptedException e )
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            System.exit(  0  );
        }
        catch( IOException e )
        {
            System.err.println( "Caught exception: " + e.getMessage( ) );
            e.printStackTrace( );
        }
    }
}
4

2 回答 2

7

我不确定这对演员来说是一个很好的测试用例。一方面,演员之间几乎没有互动。这是一个简单的 map/reduce,它需要并行性,而不是并发性。

演员的开销也很重,我不知道实际分配了多少线程。根据您拥有的处理器数量,您可能拥有比 Java 程序更少的线程——这似乎是这种情况,因为加速是 4 倍而不是 5 倍。

你编写演员的方式是针对闲置演员进行优化的,这种情况下你有成百上千的演员,但在任何时候只有少数人在做实际工作。如果你用while/receive而不是loop/来写演员react,他们会表现得更好。

现在,actor 可以很容易地将应用程序分发到多台计算机上,除了您违反了actor 的原则之一:您正在调用actor 对象上的方法。你永远不应该对演员这样做,事实上,Akka 阻止你这样做。这样做的一种更类似于演员的方式是,聚合器向每个演员询问他们的密钥集,计算他们的联合,然后对于每个密钥,要求所有演员发送他们对该密钥的计数。

但是,我不确定演员开销是否就是您所看到的。您没有提供有关 Java 实现的信息,但我敢说您使用可变映射,甚至可能使用单个并发可变映射 - 与您在 Scala 中所做的完全不同的实现。

也没有关于如何读取文件(如此大的文件可能存在缓冲问题)或如何在 Java 中解析的信息。由于大部分工作是读取和解析文件,而不是计算令牌,因此实现中的差异可以轻松克服任何其他问题。

最后,关于常驻内存大小,Scala 有一个 9 MB 的库(除了 JVM 带来的),这可能就是您所看到的。当然,如果您在 Java 中使用单个并发映射,而在 Scala 中使用 6 个不可变映射,那肯定会对内存使用模式产生很大影响。

于 2012-07-30T17:25:31.080 回答
-1

Scala 演员最后几天让位Akka 演员......还有更多即将到来 - Viktor 正在进一步努力使最后成为最好的:https ://twitter.com/viktorklang/status/229694698397257728

BTW:开源是强大的力量!这一天应该是所有基于 JVM 的社区的节日:

http://www.marketwire.com/press-release/azul-systems-announces-new-initiative-support-open-source-community-with-free-zing-jvm-1684899.htm

于 2012-07-30T17:35:01.203 回答