0

我正在用 java 开发一个应用程序,我正在使用一个共享的 LinkedBlocking 队列,并且我正在创建多个线程来读取和写入它。我已经创建了如下代码,但我无法获得所需的结果。

结果,我使用了一个共享文件,该文件由两个线程(读取和写入一个)写入。

请告诉我我的代码有什么问题:

消息阅读器.java

package com.aohandling.messagereader;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import com.aohandling.messagequeue.MessageQueue;

public class MessageReader implements Runnable
{
    public static BufferedWriter out;

    public static void init()
    {
    file = new File("AOHandle.txt");
    try
    {
        out = new BufferedWriter(new FileWriter(file, true));
        System.out.println("Init ");
    }
    catch (IOException e)
    {
        e.printStackTrace();
    }
    }

    static File file = null;

    public void run()
    {
    while (true)
    {
        try
        {
        SimpleDateFormat ft = new SimpleDateFormat("E yyyy.MM.dd 'at' hh:mm:ss a zzz");
        String s = MessageQueue.getMessageQueue().poll();
        if (s != null)
        {
            out.write("queue - " + MessageQueue.getMessageQueue().poll() + "---"  + ft.format(new Date()) + "\n");
        }

        }
        catch (IOException e)
        {
        e.printStackTrace();
        }
    }
    }
}

MessageWriter.java

package com.aohandling.writer;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import com.aohandling.messagequeue.MessageQueue;
import com.aohandling.messagereader.MessageReader;

public class MessageWriter implements Runnable
{

    int n;
    private int messageSequence;

    public MessageWriter(int messageSequence)
    {
    this.messageSequence = messageSequence;
    }

    public void run()
    {

    try
    {
        SimpleDateFormat ft = new SimpleDateFormat("E yyyy.MM.dd 'at' hh:mm:ss a zzz");
        MessageReader.out.append("Writing----AO - " + this.messageSequence + "-----" + ft.format(new Date()) + "\n");
        MessageQueue.getMessageQueue().put("AO " + this.messageSequence);
    }
    catch (IOException | InterruptedException e)
    {
        e.printStackTrace();
    }
    }
}

消息队列.java

package com.aohandling.messagequeue;

import java.util.concurrent.LinkedBlockingQueue;

public class MessageQueue {

    private static LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<String>();

    public static LinkedBlockingQueue<String> getMessageQueue() {
        return MessageQueue.messageQueue;
    }

    public static void setMessageQueue(LinkedBlockingQueue<String> messageQueue) {
        MessageQueue.messageQueue = messageQueue;
    }
}

TestAOHandlingRead.java

package com.aohandling.main;

import com.aohandling.messagereader.MessageReader;
import com.aohandling.writer.MessageWriter;

public class TestAOHandlingRead
{

    /**
     * @param args
     */
    public static void main(String[] args)
    {
    MessageReader.init();
    for (int i = 0; i <= 200; i++)
    {
        Thread readThread = new Thread(new MessageReader());
        readThread.start();
    }
    write();

    }
    public static void write()
    {
    for (int i = 0; i <= 20; i++)
    {
        if (i % 2 == 0)
        {
        try
        {
            Thread.sleep(500);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        }

        Thread writeThread = new Thread(new MessageWriter(i));
        writeThread.start();

    }
    }
}

TestAOHandlingWrite.java

package com.aohandling.main;

import java.util.concurrent.atomic.AtomicInteger;

import com.aohandling.writer.MessageWriter;

public class TestAOHandlingWrite {

    int count = 0;

    public int getCount()
    {
        return count;
    }

    /**
     * @param args
     */
    public static void main(String[] args) {

//      MessageWriter.init();
        for (int i=0; i<= 20; i++) {
         if (i%2 ==0) {
             try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
         }

         Thread writeThread = new Thread(new MessageWriter(i));
            writeThread.start();

        }


    }

}
4

1 回答 1

0

我建议您使用 FileChannel,因为 File Channel 可以安全地被多个并发线程使用。此外,我重构了您的代码,在类加载器首次加载 MessageReader 类期间,将创建一次文件通道。

public class MessageReader implements Runnable {
    private static FileChannel channel;

    static {
    try {
        System.out.println("Init ");
        FileOutputStream fileOutputStream = new FileOutputStream(
                "AOHandle.txt", true);

        FileChannel channel = fileOutputStream.getChannel();
        System.out.println("Init ");

    } catch (IOException e) {
        e.printStackTrace();
    }
    }

        public void run() {
    while (true) {
        FileLock fileLock = null;
        try {
            SimpleDateFormat ft = new SimpleDateFormat(
                    "E yyyy.MM.dd 'at' hh:mm:ss a zzz");
            String s = MessageQueue.getMessageQueue().poll();
            if (s != null) {
                String message = "queue - "
                        + MessageQueue.getMessageQueue().poll() + "---"
                        + ft.format(new Date()) + "\n";
                fileLock = channel.lock();
                channel.write(ByteBuffer.wrap(message.getBytes()));
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (fileLock != null) {
                    fileLock.release();
                }
            } catch (IOException e) {

                e.printStackTrace();
            }
        }
    }
}
}

最佳实践是在一个地方打开一个文件的通道并在你的线程之间共享他,因为在你的代码中没有人关闭文件。

于 2013-10-17T07:19:23.603 回答