1

我正在使用 RXTX 库将一些数据发送到串口。发送数据后,我必须等待 1 秒才能收到 ACK。我使用 ArrayBlockingQueue 实现了这个功能:

例如。

val queue = ArrayBlockingQueue(1)

def send(data2Send : Array[Byte]) : Array[Byte]{
   out.write(data2Send)
   queue.poll(1000)
}

def receive(receivedData : Array[Byte]){
  queue.add(receivedData)
}

这工作得很好,但是因为我正在学习 Scala,所以我想使用插入线程和锁定结构的 Actor。

我的第一次尝试如下:

class Serial {

sender = new Sender(...)
new Receiver(...).start

class Sender {
      def send(data2Send : Array[Byte]) : Array[Byte]{
       out.write(data2Send)
         receiveWithin(WAIT_TIMEOUT_MILLIS) {
          case response => response
          case TIMEOUT => null
        }
      }
    }

    class Receiver extends Actor{
      def act{
        loop{
           sender ! read()
        }
      }
    }
}

但是这段代码抛出了 java.lang.AssertionError: assertion failed: 从属于其他参与者的通道接收。我认为问题在于我不能在行为定义之外使用接收或反应。我遵循的方法是正确的吗?

第二次尝试:

class Serial {

    new Sender(...).start
    new Receiver(...).start

    def send() = (sender ?! data2Send).asInstanceOf(Array[Byte])

    class Sender {
          def act() {
          loop{
          receive{
           out.write(data2Send)
             receiveWithin(WAIT_TIMEOUT_MILLIS) {
              case response => response
              case TIMEOUT => null
            }
          }
        }
       }
      }

        class Receiver extends Actor{
          def act{
            loop{
               sender ! read()
            }
          }
        }
    }

在第二次尝试中,我得到 java.util.NoSuchElementException: head of empty list when sender !read() 行被执行。它看起来要复杂得多

4

1 回答 1

1

除非你使用 NIO,否则在这种情况下你无法避免阻塞。最终,您的消息来自您需要来自的套接字read()(即某个线程,某处,必须阻塞)。

查看您的 3 个示例(即使假设它们都有效),如果您在六个月后的凌晨 2 点发现了一个错误,认为您更愿意查看哪个代码片段?我知道我会选择哪一个!

Actor 非常适合在事件驱动系统周围发送异步消息。在你点击一些使用套接字的外部 API 的地方,你可以在它们周围包裹一个类似 actor 的外观,以便它们可以与系统的其他部分交互(这样系统的其余部分就不会知道实现细节)。几点建议:

  • 对于必须处理读取/写入套接字的实际参与者,请保持简单

  • 尝试并组织系统,使与该参与者的通信是异步的(即其他参与者不会阻塞并等待回复)

  • 鉴于 scala 演员库即将被弃用,我将开始使用akka

于 2012-06-26T22:20:38.780 回答