15

我正在尝试在 python 中编写一个简单的工作证明 nonce-finder。

def proof_of_work(b, nBytes):
    nonce = 0
    # while the first nBytes of hash(b + nonce) are not 0
    while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
        nonce = nonce + 1
    return nonce

现在我正在尝试进行多处理,因此它可以使用所有 CPU 内核并更快地找到 nonce。multiprocessing.Pool我的想法是多次使用和执行函数 proof_of_work,传递两个参数num_of_cpus_runningthis_cpu_id如下所示:

def proof_of_work(b, nBytes, num_of_cpus_running, this_cpu_id):
    nonce = this_cpu_id
    while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
        nonce = nonce + num_of_cpus_running
    return nonce

所以,如果有 4 个核心,每个核心都会像这样计算 nonce:

core 0: 0, 4, 8, 16, 32 ...
core 1: 1, 5, 9, 17, 33 ...
core 2: 2, 6, 10, 18, 34 ...
core 3: 3, 7, 15, 31, 38 ...

所以,我必须重写proof_of_work,所以当任何一个进程找到一个 nonce 时,其他人都停止寻找 nonce,考虑到找到的 nonce 必须是所需字节为 0 的最低值。如果 CPU 加速由于某种原因,并返回一个高于最低有效随机数的有效随机数,则工作证明无效。

我唯一不知道该怎么做的是,只有当进程 B 发现一个低于进程 A 现在正在计算的 nonce 的 nonce 时,进程 A 才会停止的部分。如果它更高,A 保持计算(以防万一)直到它到达 B 提供的随机数。

我希望我正确地解释了自己。另外,如果我写的任何东西有更快的实现,我很想听听。非常感谢!

4

4 回答 4

7

一个简单的选择是使用微批处理并检查是否找到了答案。太小的批次会因启动并行作业而产生开销,太大的批次会导致其他进程做额外的工作,而一个进程已经找到了答案。每批应该需要 1 - 10 秒才能有效。

示例代码:

from multiprocessing import Pool
from hashlib import sha256
from time import time


def find_solution(args):
    salt, nBytes, nonce_range = args
    target = '0' * nBytes

    for nonce in xrange(nonce_range[0], nonce_range[1]):
        result = sha256(salt + str(nonce)).hexdigest()

        #print('%s %s vs %s' % (result, result[:nBytes], target)); sleep(0.1)

        if result[:nBytes] == target:
            return (nonce, result)

    return None


def proof_of_work(salt, nBytes):
    n_processes = 8
    batch_size = int(2.5e5)
    pool = Pool(n_processes)

    nonce = 0

    while True:
        nonce_ranges = [
            (nonce + i * batch_size, nonce + (i+1) * batch_size)
            for i in range(n_processes)
        ]

        params = [
            (salt, nBytes, nonce_range) for nonce_range in nonce_ranges
        ]

        # Single-process search:
        #solutions = map(find_solution, params)

        # Multi-process search:
        solutions = pool.map(find_solution, params)

        print('Searched %d to %d' % (nonce_ranges[0][0], nonce_ranges[-1][1]-1))

        # Find non-None results
        solutions = filter(None, solutions)

        if solutions:
            return solutions

        nonce += n_processes * batch_size


if __name__ == '__main__':
    start = time()
    solutions = proof_of_work('abc', 6)
    print('\n'.join('%d => %s' % s for s in solutions))
    print('Solution found in %.3f seconds' % (time() - start))

输出(配备酷睿 i7 的笔记本电脑):

Searched 0 to 1999999
Searched 2000000 to 3999999
Searched 4000000 to 5999999
Searched 6000000 to 7999999
Searched 8000000 to 9999999
Searched 10000000 to 11999999
Searched 12000000 to 13999999
Searched 14000000 to 15999999
Searched 16000000 to 17999999
Searched 18000000 to 19999999
Searched 20000000 to 21999999
Searched 22000000 to 23999999
Searched 24000000 to 25999999
Searched 26000000 to 27999999
Searched 28000000 to 29999999
Searched 30000000 to 31999999
Searched 32000000 to 33999999
Searched 34000000 to 35999999
Searched 36000000 to 37999999
37196346 => 000000f4c9aee9d427dc94316fd49192a07f1aeca52f6b7c3bb76be10c5adf4d
Solution found in 20.536 seconds

单核耗时 76.468 秒。无论如何,这不是迄今为止找到解决方案的最有效方法,但它确实有效。例如,如果salt很长,那么SHA-256可以在盐被吸收后预先计算状态并从那里继续蛮力搜索。字节数组也可能比hexdigest().

于 2015-10-08T12:45:16.007 回答
6

执行此操作的一般方法是:

  1. 考虑工作数据包,例如执行特定范围的计算,范围不应该花费很长时间,比如 0.1 秒到 1 秒
  2. 让一些经理将工作包分发给工人
  3. 一个工作包结束后,告诉经理结果并请求一个新的工作包
  4. 如果工作完成并找到结果,则接受工人的结果并向他们发出不再执行工作的信号 - 工人现在可以安全地终止

这样,您不必每次迭代都与管理器核对(这会减慢一切),或者做一些讨厌的事情,例如在会话中停止线程。不用说,管理器需要是线程安全的。

这完全符合您的模型,因为您仍然需要其他工作人员的结果,即使已找到结果。


请注意,在您的模型中,一个线程可能与其他线程不同步,滞后。一旦找到结果,您就不想再进行一百万次计算。我只是从问题中重申这一点,因为我认为该模型是错误的。您应该修复模型而不是修复实现。

于 2015-09-12T13:53:17.900 回答
3

您可以使用 multiprocessing.Queue()。每个 CPU/进程有一个队列。当一个进程找到一个随机数时,它会将其放入其他进程的队列中。其他进程在 while 循环的每次迭代中检查他们的队列(非阻塞),如果上面有任何内容,它们会根据队列中的值决定继续或终止:

def proof_of_work(b, nBytes, num_of_cpus_running, this_cpu_id, qSelf, qOthers):
    nonce = this_cpu_id
    while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
        nonce = nonce + num_of_cpus_running
        try:
            otherNonce = qSelf.get(block=False)
            if otherNonce < nonce:
                return
        except:
            pass
    for q in qOthers:
        q.put(nonce)
    return nonce

qOthers 是属于其他进程的队列列表(每个 queue=multiprocessing.Queue())。

如果您决定按照我的建议使用队列,您应该能够编写上述方法的更好/更好的实现。

于 2015-10-05T21:09:43.930 回答
0

我喜欢通过更改为来改进 NikoNyrh 的pool.map答案pool.imap_unordered。使用imap_unordered将立即从任何工作人员返回结果,而无需等待所有工作人员完成。因此,一旦任何结果返回元组,我们就可以退出 while 循环。

def proof_of_work(salt, nBytes):
    n_processes = 8
    batch_size = int(2.5e5)
    with Pool(n_processes) as pool:

        nonce = 0

        while True:
            nonce_ranges = [
                (nonce + i * batch_size, nonce + (i+1) * batch_size)
                for i in range(n_processes)
            ]

            params = [
                (salt, nBytes, nonce_range) for nonce_range in nonce_ranges

           ]
            print('Searched %d to %d' % (nonce_ranges[0][0], nonce_ranges[-1][1]-1))

            for result in pool.imap_unordered(find_solution, params):
                if isinstance(result,tuple): return result
            
            nonce += n_processes * batch_size


于 2021-02-12T18:03:33.307 回答