ChatGPT解决这个技术问题 Extra ChatGPT

从整数流中查找运行中位数

可能重复:C 中的滚动中值算法

鉴于整数是从数据流中读取的。以有效的方式查找到目前为止读取的元素的中位数。

我读过的解决方案:我们可以在左侧使用最大堆来表示小于有效中位数的元素,在右侧使用最小堆来表示大于有效中位数的元素。

处理传入元素后,堆中的元素数量最多相差 1 个元素。当两个堆包含相同数量的元素时,我们找到堆的根数据的平均值作为有效中位数。当堆不平衡时,我们从包含更多元素的堆根中选择有效中位数。

但是我们如何构造一个最大堆和最小堆,即我们如何知道这里的有效中位数?我认为我们会在 max-heap 中插入 1 个元素,然后在 min-heap 中插入下一个 1 个元素,以此类推所有元素。纠正我如果我在这里错了。

聪明的算法,使用堆。从标题我无法立即想到解决方案。
vizier 的解决方案对我来说看起来不错,除了我假设(尽管你没有说明)这个流可以任意长,所以你不能把所有东西都保存在内存中。是这样吗?
@RunningWild 对于任意长的流,您可以通过使用斐波那契堆(因此您获得 log(N) 删除)并按顺序存储指向插入元素的指针(例如在双端队列中)来获取最后 N 个元素的中位数,然后删除最旧的堆满后每一步的元素(也许也将东西从一个堆移动到另一个堆)。通过存储重复元素的数量(如果有很多重复),你可以得到比 N 更好的结果,但总的来说,如果你想要整个流的中位数,我认为你必须做出某种分布假设。
您可以从两个空堆开始。第一个 int 在一个堆中;第二个进入另一个,或者将第一个项目移动到另一个堆然后插入。这概括为“不允许一个堆大于另一个 +1”,并且不需要特殊的大小写(空堆的“根值”可以定义为 0)
我刚刚在一次 MSFT 采访中得到了这个问题。感谢您的发表

S
Shmil The Cat

有许多不同的解决方案可以从流数据中找到运行中位数,我将在答案的最后简要讨论它们。

问题是关于特定解决方案(最大堆/最小堆解决方案)的详细信息,以及基于堆的解决方案如何工作的解释如下:

对于前两个元素,将较小的元素添加到左侧的 maxHeap 中,将较大的元素添加到右侧的 minHeap 中。然后一一处理流数据,

Step 1: Add next item to one of the heaps

   if next item is smaller than maxHeap root add it to maxHeap,
   else add it to minHeap

Step 2: Balance the heaps (after this step heaps will be either balanced or
   one of them will contain 1 more item)

   if number of elements in one of the heaps is greater than the other by
   more than 1, remove the root element from the one containing more elements and
   add to the other one

然后在任何给定时间,您都可以像这样计算中位数:

   If the heaps contain equal amount of elements;
     median = (root of maxHeap + root of minHeap)/2
   Else
     median = root of the heap with more elements

现在我将按照答案开头所承诺的一般性地讨论这个问题。从数据流中找到运行中位数是一个棘手的问题,而在一般情况下,有效地找到具有内存限制的精确解决方案可能是不可能的。另一方面,如果数据具有我们可以利用的某些特征,我们可以开发有效的专业解决方案。例如,如果我们知道数据是整数类型,那么我们可以使用counting sort,它可以给你一个常量内存常量时间算法。基于堆的解决方案是一种更通用的解决方案,因为它也可以用于其他数据类型(双精度)。最后,如果不需要确切的中位数并且近似值就足够了,您可以尝试估计数据的概率密度函数并使用它估计中位数。


这些堆无限制地增长(即,100 个元素的窗口滑动超过 1000 万个元素将需要 1000 万个元素全部存储在内存中)。请参阅下面的另一个答案,使用可索引的跳过列表,只需要将最近看到的 100 个元素保存在内存中。
正如对问题本身的评论之一所述,您也可以使用堆来获得有限的内存解决方案。
您可以在 c here. 中找到基于堆的解决方案的实现
哇,这不仅帮助我解决了这个特定问题,还帮助我学习了堆这里是我在 python 中的基本实现:github.com/PythonAlgo/DataStruct
@HakanSerce 你能解释一下我们为什么要这么做吗?我的意思是我可以看到这个作品,但我无法直观地理解它。
m
mic

如果输入的方差是统计分布的(例如正态分布、对数正态分布等),则储层采样是从任意长的数字流中估计百分位数/中位数的合理方法。

int n = 0;  // Running count of elements observed so far  
#define SIZE 10000
int reservoir[SIZE];  

while(streamHasData())
{
  int x = readNumberFromStream();

  if (n < SIZE)
  {
       reservoir[n++] = x;
  }         
  else 
  {
      int p = random(++n); // Choose a random number 0 >= p < n
      if (p < SIZE)
      {
           reservoir[p] = x;
      }
  }
}

然后,“水库”是所有输入的运行,统一(公平)样本 - 无论大小。找到中位数(或任何百分位数)是对水库进行分类和轮询感兴趣点的直接问题。

由于水库是固定大小的,因此可以认为排序实际上是 O(1) - 这种方法在恒定时间和内存消耗下运行。


出于好奇,为什么需要方差?
Stream 可能返回小于 SIZE 的元素,让水库半空。计算中位数时应考虑这一点。
有没有办法通过计算差异而不是中位数来加快速度?删除和添加的样本以及之前的中位数是否足够信息?
A
Andrew C

如果您不能一次将所有项目都保存在内存中,那么这个问题就会变得更加困难。堆解决方案要求您一次将所有元素保存在内存中。这在这个问题的大多数实际应用中是不可能的。

相反,当您看到数字时,请跟踪您看到每个整数的次数。假设 4 字节整数,即 2^32 个桶,或最多 2^33 个整数(每个 int 的键和计数),即 2^35 字节或 32GB。它可能会比这少得多,因为您不需要为那些为 0 的条目存储密钥或计数(即,就像 python 中的 defaultdict)。这需要恒定的时间来插入每个新整数。

然后在任何时候,要找到中位数,只需使用计数来确定哪个整数是中间元素。这需要恒定的时间(尽管是一个很大的常数,但仍然是常数)。


如果几乎所有数字都出现一次,那么稀疏列表将占用更多内存。如果你有这么多数字,它们似乎很可能不适合数字,以至于大多数数字会出现一次。尽管如此,这对于大量数字来说是一个聪明的解决方案。
对于稀疏列表,我同意,这在内存方面更糟。虽然如果整数是随机分布的,你会比直觉暗示的更快开始得到重复。请参阅mathworld.wolfram.com/BirthdayProblem.html。所以我敢肯定,只要你有几 GB 的数据,这就会生效。
@AndrewC您能否解释一下如何花费恒定的时间来找到中位数。如果我见过 n 种不同的整数,那么在最坏的情况下,最后一个元素可能是中位数。这使得中位数发现 O(n) 活动。
@shshnk 在这种情况下,n 不是 >>> 2^35 的元素总数吗?
@AndrewC生日问题在这里并不适用——虽然几乎可以保证你仍然会看到很少有重复的均匀分布。
H
Hellblazer

我发现计算流百分位数的最有效方法是 P² 算法:Raj Jain, Imrich Chlamtac: The P² Algorithm for Dynamic Calculation of Quantiiles and Histograms Without Storing Observations. Commun. ACM 28(10): 1076-1085 (1985)

该算法很容易实现并且运行良好。但是,这是一个估计值,因此请记住这一点。从摘要:

提出了一种启发式算法,用于动态计算中位数和其他分位数。在生成观测值时动态生成估计值。不存储观察结果;因此,无论观察次数如何,该算法的存储需求都非常小且固定。这使其非常适合在可用于工业控制器和记录器的分位数芯片中实现。该算法进一步扩展到直方图绘制。分析算法的准确性。


Count-Min Sketch 比 P^2 更好,因为它也给出了错误界限,而后者没有。
还可以考虑 Greenwald 和 Khanna 的“Space-Efficient Online Computation of Quantile Summaries”,它也给出了错误界限并具有良好的内存要求。
此外,对于概率方法,请参阅此博客文章:research.neustar.biz/2013/09/16/…,它所引用的论文在这里:arxiv.org/pdf/1407.1121v1.pdf 这称为“节俭的流式传输”
Frugal Streaming 网站出现故障,这是一个 archive.org 链接:web.archive.org/web/20190430013331/http://research.neustar.biz/…
m
mic

如果我们想找到最近看到的 n 个元素的中位数,这个问题有一个精确的解决方案,只需要将 n 个最近看到的元素保存在内存中。它速度快且扩展性好。

indexable skiplist 支持任意元素的 O(ln n) 插入、删除和索引搜索,同时保持排序顺序。当与跟踪第 n 个最旧条目的 FIFO queue 结合使用时,解决方案很简单:

class RunningMedian:
    'Fast running median with O(lg n) updates where n is the window size'

    def __init__(self, n, iterable):
        self.it = iter(iterable)
        self.queue = deque(islice(self.it, n))
        self.skiplist = IndexableSkiplist(n)
        for elem in self.queue:
            self.skiplist.insert(elem)

    def __iter__(self):
        queue = self.queue
        skiplist = self.skiplist
        midpoint = len(queue) // 2
        yield skiplist[midpoint]
        for newelem in self.it:
            oldelem = queue.popleft()
            skiplist.remove(oldelem)
            queue.append(newelem)
            skiplist.insert(newelem)
            yield skiplist[midpoint]

以下是完整工作代码的链接(易于理解的类版本和内联可索引跳过列表代码的优化生成器版本):

http://code.activestate.com/recipes/576930-efficient-running-median-using-an-indexable-skipli/

http://code.activestate.com/recipes/577073 。


如果我理解正确的话,这只会给你最后看到的 N 个元素的中位数,而不是到目前为止的所有元素。不过,这似乎确实是该操作的一个非常巧妙的解决方案。
正确的。答案听起来好像可以通过将最后 n 个元素保留在内存中来找到所有元素的中位数——这通常是不可能的。该算法只找到最后 n 个元素的中位数。
术语“运行中位数”通常用于指代数据子集的中位数。 OP 以非标准方式使用一个通用术语。
S
Sud K

一种直观的思考方式是,如果你有一个完全平衡的二叉搜索树,那么根将是中间元素,因为会有相同数量的更小和更大的元素。现在,如果树没有满,情况就不是这样了,因为最后一层会缺少元素。

所以我们可以做的是有中位数和两棵平衡二叉树,一棵用于小于中位数的元素,一棵用于大于中位数的元素。两棵树必须保持相同的大小。

当我们从数据流中获得一个新整数时,我们将它与中位数进行比较。如果它大于中位数,我们将它添加到正确的树中。如果两棵树的大小相差超过 1,我们删除右树的最小元素,使其成为新的中值,并将旧的中值放入左树中。同样适用于较小的。


你打算怎么做? “我们删除右树的最小元素”
我的意思是二叉搜索树,所以 min 元素一直从根开始。
P
Peteris

高效是一个取决于上下文的词。此问题的解决方案取决于相对于插入量执行的查询量。假设您在对中位数感兴趣的最后插入 N 个数字和 K 次。基于堆的算法的复杂度为 O(N log N + K)。

考虑以下替代方案。插入数组中的数字,并为每个查询运行线性选择算法(例如,使用快速排序枢轴)。现在你有了一个运行时间为 O(KN) 的算法。

现在,如果 K 足够小(不经常查询),则后一种算法实际上更有效,反之亦然。


在堆示例中,查找是恒定时间,所以我认为它应该是 O(N log N + K),但您的观点仍然成立。
是的,好点,将编辑这个。你是对的 N log N 仍然是主要术语。
A
Andrushenko Alexander

这是我的简单但有效的算法(在 C++ 中),用于从整数流中计算运行中位数:

#include<algorithm>
#include<fstream>
#include<vector>
#include<list>

using namespace std;

void runningMedian(std::ifstream& ifs, std::ofstream& ofs, const unsigned bufSize) {
    if (bufSize < 1)
        throw exception("Wrong buffer size.");
    bool evenSize = bufSize % 2 == 0 ? true : false;
    list<int> q;
    vector<int> nums;
    int n;
    unsigned count = 0;
    while (ifs.good()) {
        ifs >> n;
        q.push_back(n);
        auto ub = std::upper_bound(nums.begin(), nums.end(), n);
        nums.insert(ub, n);
        count++;
        if (nums.size() >= bufSize) {
            auto it = std::find(nums.begin(), nums.end(), q.front());
            nums.erase(it);
            q.pop_front();
            if (evenSize)
                ofs << count << ": " << (static_cast<double>(nums[nums.size() / 2 - 1] +
                static_cast<double>(nums[nums.size() / 2]))) / 2.0 << '\n';
            else
                ofs << count << ": " << static_cast<double>(nums[nums.size() / 2]);
        }
    }
}

bufferSize 指定数字序列的大小,必须在其上计算运行中位数。从输入流 ifs 读取数字时,大小为 bufferSize 的向量按排序顺序维护。如果 bufferSize 为奇数,则通过取排序向量的中间值来计算中位数,或者当 bufferSize 为偶数时,取两个中间元素之和除以 2。此外,我维护从输入中读取的最后一个 bufferSize 元素的列表。添加新元素时,我将其放在已排序向量中的正确位置,并从向量中删除之前添加 bufferSize 步骤的元素(保留在列表前面的元素的值)。同时我从列表中删除旧元素:每个新元素都放在列表的后面,每个旧元素都从前面删除。到达 bufferSize 后,列表和向量都停止增长,并且每次插入新元素都会通过删除旧元素来补偿,之前放置在列表 bufferSize 步骤中。请注意,我不关心我是否从向量中删除了元素,之前放置了 bufferSize 步,或者只是一个具有相同值的元素。对于中位数的值,这无关紧要。所有计算的中值都在输出流中输出。


D
Darius Bacon

你不能只用一个堆做这个吗?更新:没有。见评论。

不变式:在读取 2*n 个输入后,最小堆保存其中最大的 n 个。

循环:读取 2 个输入。将它们都添加到堆中,并删除堆的最小值。这重新建立了不变量。

因此,当 2n 输入已被读取时,堆的最小值是第 n 个最大的。需要一些额外的复杂性来平均中间位置周围的两个元素并在奇数个输入后处理查询。


不起作用:您可以丢弃后来证明靠近顶部的东西。例如,尝试使用数字 1 到 100 的算法,但顺序相反:100、99、...、1。
谢谢,泽琳。我很愚蠢地说服自己重新建立了不变量。