0

我正在制作一个分布式排序系统,其中每台机器(客户端)都有特定范围的值(分区)来存储。我发送和接收数据没有任何问题,但同步是我的问题。在从服务器检索想要的分区之前,它必须首先从其他客户端之一到达服务器。

我已经尝试过delay和其他各种 RPC 调用来尝试同步客户端和服务器,但是客户端运行不同步,程序中断。我想我必须合并futureor promise,但我不确定如何。

每个客户端都有一个唯一的 ID(从 0 开始),并将根据该编号存储分区。一行数据如下所示:AsfAGHM5om 00000000000000000000000000000000 0000222200002222000022220000222200002222000000001111

编辑:例如更新的代码

客户

  val numberOfClients = 3
  var sentPartitionsCounter = 0
  while(sentPartitionsCounter < numberOfClients) {
    if(sentPartitionsCounter != id){
      sendUnwantedPartition("src/main/scala/testFile."+sentPartitionsCounter.toString) //"partition."+client.sentPartitionsCounter
      sentPartitionsCounter += 1
    }
    client.getWantedPartition()
  }

  def sendUnwantedPartition(filename: String): Unit = {
    val dataList = Source.fromFile(filename).getLines.toList
    val dataSeq = for {
                    dataLine <- dataList
                    dataValues = dataLine.split(" ", 2)
                  } yield (Data(key = dataValues(0), value = dataValues(1)))
    val partitionID = filename takeRight 1 
    val request = Dataset(data = dataSeq, partitionID = partitionID.toInt)
    val response = blockingStub.getUnwantedPartitions(request)
    print("sent partitions")
  }

  def getWantedPartition(): Unit = {
    val request = ID(id = id)
    val response = blockingStub.sendWantedPartitions(request)
    val wantedPartitions = new File("clientFiles/wantedPartitions"+id.toString+".txt")
    val printWriter: PrintWriter = new PrintWriter(new FileWriter(wantedPartitions, true));

    if(!wantedPartitions.exists()){
      wantedPartitions.createNewFile()
    } 

    for {
      data <- response.data
    } yield (printWriter.append(data.key + " " + data.value + "\n"))
    
    printWriter.close();
  }

服务器

override def getUnwantedPartitions(req: Dataset) = {
  val filename = "serverPartitions/partition"+req.partitionID+".txt" 
  val partition = new File(filename)
  val printWriter: PrintWriter = new PrintWriter(new FileWriter(partition, true));

  if(!partition.exists()){ 
    partition.createNewFile()
  } 

  for {
    data <- req.data
  } yield (printWriter.append(data.key + " " + data.value + "\n"))

  printWriter.close();
  Future.successful(Text("Got unwanted data"))
}

override def sendWantedPartitions(req: ID) = {
  val filename = "serverPartitions/partition"+req.id+".txt"
  try {
    val dataList = Source.fromFile(filename).getLines.toList
    val dataSeq = for {
                    dataLine <- dataList
                    dataValues = dataLine.split(" ", 2)
                  } yield (Data(key = dataValues(0), value = dataValues(1)))
    val reply = Dataset(data = dataSeq, partitionID = req.id)
    new File(filename).delete()
    Future.successful(reply)
  } catch {
    // Partition hasn't been received yet
    case e: FileNotFoundException => Future.successful(Dataset())
  }
}

原型

syntax = "proto3";

package protoPackage;

service DistrSorting {
  rpc getUnwantedPartitions(Dataset) returns (Text) {}
  rpc sendWantedPartitions(ID) returns (Dataset) {}
}

message ID {
  int32 id = 1;
}

message Text {
  string text = 1;
}

message Dataset {
  repeated Data data = 1;
  int32 partitionID = 2;
}

message Data {
  string key = 1;
  string value = 2;
}
4

0 回答 0