0

I have unsorted pairs of integers, which represent some time intervals (first number is always less than the second one). The problem is to assign an integer, so called channel number(0..x) to each time interval, so that the intervals which do not collide will share the same channel. The least possible number of channels should be used.

For example those intervals will use 2 channels:

50 100 //1

10 70 //0

80 200 //0

I've implemented it using counting sort, to sort the input by the first column, and then used linear search to find chains of pairs, which follow one another. I also first of all copy the input *const*array to the new one, and at the end, assign values to the correct positions in the input array.

Yes, it is an assignment I've got from the University, and its implemented already, but can anybody please tell me how to make the code faster ? Which algorithm to use, so that sorting, chaining of pairs will be as fast as possible ? The length of the input array is up to 10 millions elements.

Here is the code:

#include <cstdlib>
#include <cstdio>
#include <iostream>
using namespace std;   

struct TPhone
 {
   unsigned int    m_TimeFrom;
   unsigned int    m_TimeTo;
   unsigned int    m_Channel;
 };

 class TElement
 {
 public:

  TPhone m_data;
  int index;

  TElement(TPhone  * const data, int index_)
  {
    m_data.m_TimeFrom=data->m_TimeFrom;
    m_data.m_TimeTo=data->m_TimeTo;
    m_data.m_Channel=-1;
    index=index_;
  }
  TElement()
  {
  }
 };

int FindNext(TElement** sorted_general_array, int general_array_size, int index_from)
{
  for (int i=index_from+1; i<general_array_size; i++ )
  {
    if (sorted_general_array[i]->m_data.m_TimeFrom > sorted_general_array[index_from]->m_data.m_TimeTo)
    {
      if (sorted_general_array[i]->m_data.m_Channel==(unsigned int)-1)
      {
        return i;
      }
    }
  }
  return -1;
}

int AssignChannels(TElement **sorted_general_array, int general_array_size)
{
  int current_channel=-1;
  for (int i=0; i<general_array_size; i++)
    {
      if (sorted_general_array[i]->m_data.m_Channel==(unsigned int)-1)
      {
        current_channel++;
        sorted_general_array[i]->m_data.m_Channel=current_channel;
        //cout << sorted_general_array[i]->m_data.m_TimeFrom << " " << sorted_general_array[i]->m_data.m_TimeTo << " " << sorted_general_array[i]->m_data.m_Channel << endl;
        int next_greater=i;
        while (1)
        {
          next_greater=FindNext(sorted_general_array,general_array_size,next_greater);
          if (next_greater!=-1)
          {
            sorted_general_array[next_greater]->m_data.m_Channel=current_channel;
            //cout << sorted_general_array[next_greater]->m_data.m_TimeFrom << " " << sorted_general_array[next_greater]->m_data.m_TimeTo << " " << sorted_general_array[next_greater]->m_data.m_Channel << endl;
          }
          else
          {
            break;
          } 
        }
      }
    }
    return current_channel;
}

int AllocChannels ( TPhone  * const * req, int reqNr )
 {
  //initialize
  int count_array_size=1700000;
  int * count_array;
  count_array=new int [count_array_size];
  for (int i=0; i<count_array_size; i++)
  {
     count_array[i]=0;
  }
  //
  int general_array_size=reqNr;
  TElement ** general_array;
  general_array=new TElement *[general_array_size];
  for (int i=0; i<general_array_size; i++)
  {
    general_array[i]= new TElement(req[i],i);
  }
  //--------------------------------------------------
  //counting sort
  //count number of each element
  for (int i=0; i<general_array_size; i++)
  {
    count_array[general_array[i]->m_data.m_TimeFrom]++;
  }
  //modify array to find postiions
  for (int i=0; i<count_array_size-1; i++)
  {
    count_array[i+1]=count_array[i+1]+count_array[i];
  }
  //make output array, and fill in the sorted data
  TElement ** sorted_general_array;
  sorted_general_array=new TElement *[general_array_size];

  for (int i=0; i <general_array_size; i++)
  {
    int insert_position=count_array[general_array[i]->m_data.m_TimeFrom]-1;
    sorted_general_array[insert_position]=new TElement;

    //cout << "inserting " << general_array[i]->m_data.m_TimeFrom << " to " << insert_position << endl;
    sorted_general_array[insert_position]->m_data.m_TimeFrom=general_array[i]->m_data.m_TimeFrom;
    sorted_general_array[insert_position]->m_data.m_TimeTo=general_array[i]->m_data.m_TimeTo;
    sorted_general_array[insert_position]->m_data.m_Channel=general_array[i]->m_data.m_Channel;
    sorted_general_array[insert_position]->index=general_array[i]->index;


    count_array[general_array[i]->m_data.m_TimeFrom]--;
    delete  general_array[i];
  }
  //free memory which is no longer needed
  delete [] general_array;
  delete [] count_array;
  //--------------------------------------------------

  int channels_number=AssignChannels(sorted_general_array,general_array_size);
  if (channels_number==-1)
  {
    channels_number=0;
  }
  else
  {
    channels_number++;
  }

  //output
  for (int i=0; i<general_array_size; i++)
  {
    req[sorted_general_array[i]->index]->m_Channel=sorted_general_array[i]->m_data.m_Channel;
  }


  //free memory and return
  for (int i=0; i<general_array_size; i++)
  {
    delete sorted_general_array[i];
  }
  delete [] sorted_general_array;

  return channels_number;
 }                                                             


int main ( int argc, char * argv [] )
 {
   TPhone ** ptr;
   int cnt, chnl;

   if ( ! (cin >> cnt) ) return 1;

   ptr = new TPhone * [ cnt ];
   for ( int i = 0; i < cnt; i ++ )
    {
      TPhone * n = new TPhone;
      if ( ! (cin >> n -> m_TimeFrom >> n -> m_TimeTo) ) return 1;
      ptr[i] = n;
    }

   chnl = AllocChannels ( ptr, cnt );

   cout << chnl << endl;
   for ( int i = 0; i < cnt; i ++ )
    {
      cout << ptr[i] -> m_Channel << endl;
      delete ptr[i];
    }
   delete [] ptr; 
   return 0;
  }
4

6 回答 6

4

这个问题已经有一个公认的答案。但是,我想描述一种与公认答案略有不同的方法。

你必须测量

如果不进行测量,您将无法说出任何有关性能的信息。为了衡量我们需要测试用例。所以在我看来,第一项工作是创建一个生成测试用例的程序。

我做了一大堆假设,可能是不正确的,并生成了以下代码来生成测试用例:

#include <iostream>
#include <random>

int
main()
{
    const unsigned N = 10000000;
    std::mt19937_64 eng(0);
    std::uniform_int_distribution<unsigned> start_time(0, N);
    std::chi_squared_distribution<> duration(4);
    std::cout << N << '\n';
    for (unsigned i = 0; i < N;)
    {
        unsigned st = start_time(eng);
        unsigned et = st + static_cast<unsigned>(duration(eng));
        if (et > st)
        {
            std::cout << st << ' ' << et << '\n';
            ++i;
        }
    }
}

可以改变 的值N、随机数引擎上的播种范围(如果不是随机数引擎的选择)、开始时间的范围以及持续时间概率分布的类型/形状。我凭空提出了这些选择。您的教授可能对为这个问题生成合理的测试用例有更好的想法。但是衡量一些东西总比什么都不衡量好。

利用std::lib

标准库充满了容器和算法。这段代码不仅经过调试,而且效率很高。重复使用此代码是一种很好的编码风格,因为:

  1. 它教你识别容器,以及何时使用什么容器。
  2. 它教你识别算法,以及何时使用什么算法。
  3. std:lib.
  4. 它使其他人更容易阅读您的代码,因为他们将了解标准定义的容器和算法。
  5. 它使您的代码更容易调试,因为代码中出现错误的概率远高于代码中出现错误的概率std::lib(尽管两者的概率都不为零)。

例如

我用 I/O 扩充了您的TPhone结构,以减轻您在 main 中执行的 I/O 的复杂性:

friend
std::istream&
operator>>(std::istream& is, TPhone& p)
{
    return is >> p.m_TimeFrom >> p.m_TimeTo;
}

friend
std::ostream&
operator<<(std::ostream& os, const TPhone& p)
{
    return os << '{' <<  p.m_TimeFrom << ", "
                     <<  p.m_TimeTo << ", "
                     << p.m_Channel << '}';
}

我选择vector<TPhone>保留所有电话。这简化了这一点:

int main ( int argc, char * argv [] )
 {
   TPhone ** ptr;
   int cnt, chnl;

   if ( ! (cin >> cnt) ) return 1;

   ptr = new TPhone * [ cnt ];
   for ( int i = 0; i < cnt; i ++ )
    {
      TPhone * n = new TPhone;
      if ( ! (cin >> n -> m_TimeFrom >> n -> m_TimeTo) ) return 1;
      ptr[i] = n;
    }

归根结底:

int main()
{
    using namespace std;
    vector<TPhone> ptr;
    int cnt;
    if (!(cin >> cnt)) return 1;
    ptr.reserve(cnt);
    for (int i = 0; i < cnt; ++i)
    {
        TPhone n;
        if (!(cin >> n)) return 1;
        ptr.push_back(n);
    }

事实证明,我的版本比你的更有效。我只是通过学习如何使用来“免费”获得这种效率std::vector

AllocChannels现在可以采取std::vector<TPhone>&

int
AllocChannels(std::vector<TPhone>& ptr)

在这里,我使用了我能想到的最简单的算法。不是因为我认为它可能是最快的,而是因为您需要一个基准来衡量。事实证明,简单并不总是很慢……</p>

int
AllocChannels(std::vector<TPhone>& ptr)
{
    using namespace std;
    if (ptr.size() == 0)
        return 0;
    // sort TPhone's by x.m_TimeFrom
    sort(ptr.begin(), ptr.end(), [](const TPhone& x, const TPhone& y)
                                       {
                                           return x.m_TimeFrom < y.m_TimeFrom;
                                       });
   // Create channel 0 and mark it as busy by the ptr[0] until ptr[0].m_TimeTo
    vector<unsigned> channels(1, ptr.front().m_TimeTo);
    ptr.front().m_Channel = 0;
   // For each call after the first one ...
    for (auto i = next(ptr.begin()); i != ptr.end(); ++i)
    {
        // Find the first channel that isn't busy at this m_TimeFrom
        auto j = find_if(channels.begin(), channels.end(),
                                           [&](unsigned tf)
                                             {
                                                 return tf < i->m_TimeFrom;
                                             });
        if (j != channels.end())
        {
           // Found a non-busy channel, record it in use for this call
           i->m_Channel = j - channels.begin();
           // Mark the channel busy until m_TimeTo
           *j = i->m_TimeTo;
        }
        else
        {
            // Record a new channel for this call
            i->m_Channel = channels.size();
            // Create a new channel and mark it busy until m_TimeTo
            channels.push_back(i->m_TimeTo);
        }
    }
    return channels.size();
}

我使用了一些 C++11 功能,因为它们很方便(例如 auto 和 lambdas)。如果您没有这些功能可用,它们很容易在 C++03 中解决。我使用的基本算法是按 排序m_TimeFrom,然后对已排序的调用列表进行线性遍历,并为每个调用在一组通道中线性搜索一个未使用的通道(创建一个新的如果全部都在使用中,则为一个)。注意使用标准算法sortfind_if. 重新实现这些没有意义,尤其是对于基线测试用例。

我曾经<chrono>为所有事情计时:

auto t0 = chrono::high_resolution_clock::now();
int chnl = AllocChannels(ptr);
auto t1 = std::chrono::high_resolution_clock::now();

我以完全相同的方式检测您的代码,以便我可以测试两者。这是我的结果,首先生成一个长度 = {100, 1000, 10000, 100000, 1000000, 10000000} 的测试用例,并且对于每个长度,首先运行您的代码然后是我的,两者都仅使用此输出:

cout << "#intervals = " << cnt << '\n';
cout << "#channels = " << chnl << '\n';
cout << "time = " << chrono::duration<double>(t1-t0).count() << "s\n";

这是我得到的:

$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out > test.dat
$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out < test.dat
#intervals = 100
#channels = 10
time = 0.00565518s
$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out < test.dat
#intervals = 100
#channels = 10
time = 6.934e-06s

$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out > test.dat
$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out < test.dat
#intervals = 1000
#channels = 17
time = 0.00578557s
$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out < test.dat
#intervals = 1000
#channels = 17
time = 5.4779e-05s

$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out > test.dat
$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out < test.dat
#intervals = 10000
#channels = 16
time = 0.00801314s
$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out < test.dat
#intervals = 10000
#channels = 16
time = 0.000656864s

$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out > test.dat
$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out < test.dat
#intervals = 100000
#channels = 18
time = 0.0418109s
$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out < test.dat
#intervals = 100000
#channels = 18
time = 0.00788054s

$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out > test.dat
$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out < test.dat
#intervals = 1000000
#channels = 19
time = 0.688571s
$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out < test.dat
#intervals = 1000000
#channels = 19
time = 0.093764s

$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out > test.dat
$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out < test.dat
Segmentation fault: 11
$ clang++ -stdlib=libc++ -std=c++11 test.cpp -O3
$ a.out < test.dat
#intervals = 10000000
#channels = 21
time = 1.07429s

概括

这里没有人,包括我自己,预测到最简单的解决方案会始终以惊人的数量击败您的第一次尝试。这可能是我生成的测试用例的一个特征。这将是您通过生成其他测试用例来衡量的其他内容。

对于您的 N = 10000000,我不知道分段错误的原因。我没有花时间研究您的代码。坦率地说,我发现你的代码很复杂。

我忽略了编写正确性测试。这实际上应该是我的第一步。输出是否正确?我变得懒惰,只是瞥了一眼 N == 100 的情况,看看它是否正确。

由于 std::containers 和算法的重用,我的代码实际上比你的更容易调整性能。例如,您可以尝试std::lower_bound(二进制搜索)代替std::find_if,并衡量它是否改善了事情(我打赌不是,但您应该衡量,并使用test.dat您尊重的 a)。

将您的代码分解为容器和算法。重用标准定义的容器和算法(如果存在),或者创建您自己的容器和算法,以便在您未来的编码中重用。作为一名学生,我希望标准定义的标准足以满足您的大多数用例。

始终测试正确性(因为我在这里没有这样做:-))不要在没有测量的情况下假设任何关于性能的事情。二进制搜索并不总是比线性搜索快,尽管它具有更好的渐近复杂度。输入数据会强烈影响算法的性能。了解如何生成各种输入数据,以了解您的算法会受到怎样的影响。 <random>非常适合这项任务。

于 2013-11-04T00:17:42.517 回答
2

将条目存储在 astd::vector<TPhone>而不是 a 中TPhone **。这将在内存中连续布局连续TPhone对象,从而减少缓存未命中。

尝试使用其他数据类型而不是unsigned int. TPhone查看<cstdint>您可以尝试的类型。

于 2013-11-03T20:17:19.010 回答
0

如果您希望您的算法更快,您应该尽可能减少搜索。此外,您不需要知道哪些间隔“链接在一起”来确定每个间隔的正确通道(即不使用超过绝对必要的通道)。以下是我将用于获得最佳性能的步骤/技术:

  1. 像这样定义你的区间类,添加两个内联函数定义(我为 TimeDescriptor 使用一个结构只是风格问题,而不是这段代码完全时尚):

    typedef struct TimeDescriptor {
        unsigned time;
        bool isEnd;
    } TimeDescriptor;
    
    class TimeInterval {
        public:
            TimeDescriptor start, end;
            unsigned channel;
    
            TimeInterval(unsigned startTime, unsigned endTime) {
                start = (TimeDescriptor){ startTime, false };
                end = (TimeDescriptor){ endTime, true };
            }
    }
    
    inline TimeInterval* getInterval(TimeDescriptor* me) {
        return (me->isEnd) ? (TimeInterval*)(me - 1) : (TimeInterval*)me;
    }
    
    inline TimeDescriptor* getOther(TimeDescriptor* me) {
        return (me->isEnd) ? (me - 1) : (me + 1);
    }
    
  2. 创建一个指向所有 TimeDescriptor 的指针数组,每个 TimeInterval 有两个(一个用于开始,另一个用于结束)。

  3. 按时间对这个 TimeDescriptor 指针数组进行排序。确保将isEnd标志用作辅助排序键。我不确定如何定义间隔冲突,即两个间隔 (20, 30) 和 (30, 40) 是否冲突,如果它们冲突,则在开始时间之后以相同的值对结束时间进行排序,如果没有冲突,以相同的值对开始时间之前的结束时间进行排序。

    无论如何,我建议只使用标准的快速排序实现来对数组进行排序。

  4. 为未使用的通道号创建堆栈。这个堆栈的重要之处在于: 它必须允许您在恒定时间内获取/推送一个通道号,理想情况下更新内存中不超过两个数字;并且它必须是无底的,即它必须允许你弹出任意数量的值,产生一个递增的整数序列。

    实现这种堆栈的最简单方法可能是编写一个小类,该类使用std::vector<unsigned>来存储空闲通道,并跟踪曾经使用过的最大通道数。每当无法从内部存储中处理弹出请求时,都会通过将最大通道号加一来生成新的通道号。

  5. 遍历排序的 TimeDescriptors 数组。每次遇到开始时间时,获取一个频道号,并将其存储在相应的 TimeInterval 中(使用getInterval())。每次遇到结束时间时,将其通道号推回空闲通道阵列。

  6. 当您完成时,您的免费频道堆栈将告诉您同时使用的最大频道数,并且每个 TimeInterval 都将包含要使用的正确频道号。您甚至可以通过简单地通过通道号重新排列 TimeInterval 数组来有效地计算共享通道的所有间隔链......

于 2013-11-03T22:15:50.947 回答
0

如果您对集合进行了排序,为什么要使用线性搜索?使用二进制搜索。

于 2013-11-03T20:38:54.390 回答
0

设 [a i , b i ) 为区间,i = 1, ..., n。您想要设计一个函数 channel(i),它为您的 n 个间隔中的每一个返回一个通道号。

您唯一的限制是没有两个相交的间隔可以在同一个通道上。这对应于一个无向图,其中您的区间是顶点,并且当且仅当相应的区间相交时,两个顶点之间才有边。

如果这些顶点形成一个独立的集合,您可以将通道 C 分配给一组特定的顶点(区间) 。

您想找到一组这种形式的独立集,其中所有这些集的并集覆盖了图,并且它们是成对不相交的。您需要尽可能少的独立集。

寻找最大独立集的(相关)问题是NP完全的。所以我认为你不应该期望找到一个多项式时间算法来找到为你提供最少通道数的解决方案。

更现实的期望有两种形式之一:(A) 花费超多项式时间来解决问题,或者 (B) 使用可能无法为您提供全局最优的近似算法。

对于 (B),您可以这样做:

feasible(M)

    initialize M empty channels (lists of intervals)

    sort intervals by a_i value

    for each interval I = [a_i, b_i):

        insert it into the channel for which the most recent
        interval is closest to the current interval (but not intersecting)

        if I cannot be inserted at the end of any channel, return false

    return true //the M channels are a feasible solution

现在使用此过程,您可以指数搜索可行返回 true 的最小 M。

尝试 M = 1, 2, 4, 8, 16, ... 直到你达到第一个 M = 2 k这样feasible(M)返回 true。然后在 2 k - 1和 2 k之间进行二分搜索,找到最小的 M。

于 2013-11-03T19:48:43.107 回答
0

很抱歉在这里成为死灵法师,但是在阅读了问题并发布了答案之后,我就是放不下这个。

频道分配算法:

该问题有一个非常有效的贪婪解决方案。考虑以下递归:

  • 将第一个间隔分配给通道 1
  • 对于每个剩余间隔:
    • 对于每个通道(分配至少 1 个间隔):
      • 如果与分配的间隔不存在冲突,则将间隔分配给该通道并处理下一个间隔。
    • 如果所有通道都存在冲突,则将间隔分配给新通道。

这将产生最佳数量的通道。通过对区间数进行归纳,证明是微不足道的。

对输入进行排序

速度的关键在于“如果不存在冲突”。这意味着将在已处理的内容和待处理的内容之间进行比较,并且应该很容易让自己相信首先对输入进行排序最终会比在处理它们时排序更快(或不对它们进行排序)。

如果您不相信,请考虑以下两个极端:

  1. 所有间隔重叠。
  2. 没有间隔重叠。

选择排序算法

我们需要按开始时间对输入进行排序,然后是结束时间。如果我们选择一个稳定的排序,这很容易,首先按结束时间排序,然后按开始时间排序。考虑到所有值都是整数,一个稳定版本的计数排序可能是最好的选择。输入的数量远大于输入的范围;并且内存使用不是一个重要的考虑因素。在这些条件下,这种排序在输入数量上是线性的。

对频道进行排序

通过对输入进行排序,我们只需将每个间隔与分配给每个通道的最后一个间隔进行比较。在没有区间重叠的极端情况下,该算法是线性的:O(n) 排序,+ O(n) 处理 = O(n)。在所有区间重叠的极端的另一端,如果没有进一步的改进,算法将是二次的。

为了改进这一点,而不是与所有频道进行比较,如果频道按最早结束时间排序,那么与第一个频道的冲突将自动指示所有频道都冲突。然后对于每个间隔,我们只需要 1 次比较,以及保持通道排序顺序所需的任何内容。

为此,我建议将通道保持在最小堆中(到结束时间)。比较所需的通道始终位于顶部。偷看那个频道,然后:

  • 如果有重叠,则创建一个新通道,将其添加到堆中。这个新通道可能必须以 O(lg m) 的成本向上移动,其中 m 是当前通道数。
  • 否则,弹出最小通道 O(lg m),将间隔添加到它(更改它的值)并将其添加回堆通常 O(lg m)。

在最坏的噩梦场景中,排序间隔将具有单调递增的开始时间和单调递减的结束时间。这给了我们 O(n + lg 1 + lg 2 + ... + lg n) = O(n + lg(n!)) = O(n + n lg n) = O 算法的最坏情况(n lg n)

真实世界

渐近更好并不总是更好。这实际上取决于输入的分布以及输入的大小。我相信这里描述的算法优于其他提出的算法,但是在实现中肯定有空间可以选择渐近相同但会产生不同结果的选择。

于 2015-12-02T21:47:32.527 回答