0

I am trying to use multi-threading to sort arrays that are stored in a map. There are a large amount of records, ~3.1 million, and thus while I am attempting to sort these records in a single threaded for loop it takes many hours to complete. I'm hoping to get this time down as much as possible, ideally within a few minutes (please don't laugh!).

Stacktrace:

    Exception in thread "main" java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: Comparison method violates its general contract!
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
    at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:562)
    at java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:591)
    at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:689)
    at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
    at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:765)
    at com.salesforce.process.Process.startProcess(Process.java:51)
    at com.salesforce.process.Schedule.main(Schedule.java:10)
Caused by: java.lang.IllegalArgumentException: Comparison method violates its general contract!
    at java.base/java.util.TimSort.mergeLo(TimSort.java:781)
    at java.base/java.util.TimSort.mergeAt(TimSort.java:518)
    at java.base/java.util.TimSort.mergeCollapse(TimSort.java:448)
    at java.base/java.util.TimSort.sort(TimSort.java:245)
    at java.base/java.util.Arrays.sort(Arrays.java:1307)
    at java.base/java.util.ArrayList.sort(ArrayList.java:1721)
    at com.salesforce.process.Process.lambda$startProcess$0(Process.java:51)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1779)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
    at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

Class Object:

public class MyObject {
private Integer id;
public String someString;
public Double sortableValue;

... contructors & getters and setters ...
public static Comparator<MyObject> SortableValueComparator = new Comparator<MyObject>() {

    public int compare(MyObject ds1, MyObject ds2) {
       Double sortableValue1 = ds1.getSortableValue();
       Double sortableValue2 = ds2.getSortableValue();
       //descending order      
       if (Double.compare(sortableValue1, sortableValue2) == 0) {
            return 0;
        }
        else if (Double.compare(sortableValue1, sortableValue2) < 0) {
            return -1;
        }
        else {
            return 1;
        }
    }
};

The Code:

And I'm trying to execute this in the code like this:

Map<String,List<MyObject>> map = new HashMap<String,List<MyObject>>();
// inject 3.1 million keys with List<MyObject> values, with 1-10 items in each list.

map.values().parallelStream().forEach(list -> list.sort(MyObject.SortableValueComparator));

NOTE: this is not what I want to do, but I initially wrote the code like this and it worked. That is to say, my comparator works if I do it this way.

for (List<MyObject> list : map.values()) {
            Collections.sort(list, MyObject.SortableValueComparator);    
        }

however, it takes for.ev.er to complete, which sadly is not acceptable for our business case. What can this noob do to make this parallelStream() or some way of threading this work?? If you need more info, please let me know! Thanks so much!!

Edit: I want to also give you guys a sample of the data below. So this is a Map<String,List<MyObject>>.

key (String): "key1", values (List<MyObject>): [{"a",0.0112},{"b",0.12},{"c",0.00512}]
key: "key2", values: [{"d",0.0922},{"a",0.0112},{"f",0.23}]
key: "key3", values: [{"z",0.141},{"w",0.432},{"x",0.0001}]

so, If I wanted to sort key3 list of objects, they would return like this:

key: "key3", values: [{"w",0.432},{"z",0.141},,{"x",0.0001}]

and, I want to do this sort function on every record.

4

2 回答 2

1

最好在抛出异常的地方放置一个断点并检查正在比较的值。然后编写一个单元测试,检查将这些值传递给比较器时会发生什么以及结果如何与相同两个对象上的“等于”进行比较。您的比较器可能会为不“相等”的对象返回 0 值 - 即 MyObject 上的“等于”的实现会比较 sortableValue 以外的对象。这在合并集合时会导致问题。

因此,设置一个断点,查看哪些值破坏了合同,在一两次测试中捕获它。一旦你弄清楚了,你可能需要在比较器中添加一些额外的字段(如果你不能控制'equals'或者这是你不能改变的现有代码)到你的比较器使“等于”匹配。

于 2022-01-03T22:24:06.090 回答
0

而不是使用

Map.values().parallelStream().forEach(list -> list.sort(comparator))

我用了

Map.values().Stream().forEach(list -> list.sort(comparator))

它奏效了!

于 2022-01-11T16:32:12.243 回答