ChatGPT解决这个技术问题 Extra ChatGPT

Find running median from a stream of integers

Possible Duplicate: Rolling median algorithm in C

Given that integers are read from a data stream. Find median of elements read so far in efficient way.

Solution I have read: We can use a max heap on left side to represent elements that are less than the effective median, and a min heap on right side to represent elements that are greater than the effective median.

After processing an incoming element, the number of elements in heaps differ at most by 1 element. When both heaps contain the same number of elements, we find the average of heap's root data as effective median. When the heaps are not balanced, we select the effective median from the root of heap containing more elements.

But how would we construct a max heap and min heap i.e. how would we know the effective median here? I think that we would insert 1 element in max-heap and then the next 1 element in min-heap, and so on for all the elements. Correct me If I am wrong here.

Clever algorithm, using heaps. From the title I couldn't immediately think of a solution.
vizier's solution looks good to me, except that I was assuming (though you did not state) that this stream could be arbitrarily long, so you couldn't keep everything in memory. Is that the case?
@RunningWild For arbitrarily long streams, you could get the median of the last N elements by using Fibonacci heaps (so you get log(N) deletes) and storing pointers to inserted elements in order (in e.g. a deque), then removing the oldest element at each step once the heaps are full (maybe also moving things from one heap to the other). You could get somewhat better than N by storing the numbers of repeated elements (if there are lots of repeats), but in general, I think you have to make some kind of distributional assumptions if you want the median of the whole stream.
You can start with both heaps empty. First int goes in one heap; second goes either in the other, or you move the first item to the other heap and then insert. This generalizes to "don't allow one heap to go bigger than the other +1" and no special casing is needed (the "root value" of an empty heap can be defined as 0)
I JUST got this question on a MSFT interview. Thank you for posting

S
Shmil The Cat

There are a number of different solutions for finding running median from streamed data, I will briefly talk about them at the very end of the answer.

The question is about the details of the a specific solution (max heap/min heap solution), and how heap based solution works is explained below:

For the first two elements add smaller one to the maxHeap on the left, and bigger one to the minHeap on the right. Then process stream data one by one,

Step 1: Add next item to one of the heaps

   if next item is smaller than maxHeap root add it to maxHeap,
   else add it to minHeap

Step 2: Balance the heaps (after this step heaps will be either balanced or
   one of them will contain 1 more item)

   if number of elements in one of the heaps is greater than the other by
   more than 1, remove the root element from the one containing more elements and
   add to the other one

Then at any given time you can calculate median like this:

   If the heaps contain equal amount of elements;
     median = (root of maxHeap + root of minHeap)/2
   Else
     median = root of the heap with more elements

Now I will talk about the problem in general as promised in the beginning of the answer. Finding running median from a stream of data is a tough problem, and finding an exact solution with memory constraints efficiently is probably impossible for the general case. On the other hand, if the data has some characteristics we can exploit, we can develop efficient specialized solutions. For example, if we know that the data is an integral type, then we can use counting sort, which can give you a constant memory constant time algorithm. Heap based solution is a more general solution because it can be used for other data types (doubles) as well. And finally, if the exact median is not required and an approximation is enough, you can just try to estimate a probability density function for the data and estimate median using that.


These heaps grow without bound (i.e. a 100 element window sliding over 10 million elements would require the 10 million elements to all be stored in memory). See below for another answer using indexable skiplists that only requires the most recently seen 100 elements be kept in memory.
You can have a bounded memory solution using heaps as well, as explained in one of the comments to the question itself.
You can find an implementation of the heap-based solution in c here.
Wow this helped me not only is solving this specific problem but also helped me learn heaps here is my basic implementation in python : github.com/PythonAlgo/DataStruct
@HakanSerce Can you please explain why we did what we did? I mean I can see this works, but I am not able to understand it intuitively.
m
mic

If the variance of the input is statistically distributed (e.g. normal, log-normal, etc.) then reservoir sampling is a reasonable way of estimating percentiles/medians from an arbitrarily long stream of numbers.

int n = 0;  // Running count of elements observed so far  
#define SIZE 10000
int reservoir[SIZE];  

while(streamHasData())
{
  int x = readNumberFromStream();

  if (n < SIZE)
  {
       reservoir[n++] = x;
  }         
  else 
  {
      int p = random(++n); // Choose a random number 0 >= p < n
      if (p < SIZE)
      {
           reservoir[p] = x;
      }
  }
}

"reservoir" is then a running, uniform (fair), sample of all input - regardless of size. Finding the median (or any percentile) is then a straight-forward matter of sorting the reservoir and polling the interesting point.

Since the reservoir is fixed size, the sort can be considered to be effectively O(1) - and this method runs with both constant time and memory consumption.


out of curiosity, why do you need variance?
Stream might return less than SIZE elements letting reservoir half empty. This should be considered when computing median.
Is there is a way to make this faster by calculating the difference instead of the median? Is the removed and added sample and the previous median enough information for that?
A
Andrew C

If you can't hold all the items in memory at once, this problem becomes much harder. The heap solution requires you to hold all the elements in memory at once. This is not possible in most real world applications of this problem.

Instead, as you see numbers, keep track of the count of the number of times you see each integer. Assuming 4 byte integers, that's 2^32 buckets, or at most 2^33 integers (key and count for each int), which is 2^35 bytes or 32GB. It will likely be much less than this because you don't need to store the key or count for those entries that are 0 (ie. like a defaultdict in python). This takes constant time to insert each new integer.

Then at any point, to find the median, just use the counts to determine which integer is the middle element. This takes constant time (albeit a large constant, but constant nonetheless).


If almost all of the numbers are seen once, than a sparse list will take even more memory. And it seems rather likely that if you have so many numbers they don't fit in number that most of the numbers will appear once. Dispite that, this is a clever solution for massive counts of numbers.
For a sparse list, I agree, this is worse in terms of memory. Though if the integers are randomly distributed, you'll start to get duplicates a lot sooner than intuition implies. See mathworld.wolfram.com/BirthdayProblem.html. So I'm pretty sure this will become effective as soon as you have even a few GBs of data.
@AndrewC can you pls explain how it will take constant time to find the median. If I have seen n different kind of integers then in the worst case last element may be the median. This makes median finding O(n) activity.
@shshnk Isn't n the total number of elements which is >>> 2^35 in this case?
@AndrewC The birthday problem doesn't apply much here -- while duplicates will be nearly guaranteed you'll still see very few of them on average for a uniform distribution.
H
Hellblazer

The most efficient way to calculate a percentile of a stream that I have found is the P² algorithm: Raj Jain, Imrich Chlamtac: The P² Algorithm for Dynamic Calculation of Quantiiles and Histograms Without Storing Observations. Commun. ACM 28(10): 1076-1085 (1985)

The algorithm is straight forward to implement and works extremely well. It is an estimate, however, so keep that in mind. From the abstract:

A heuristic algorithm is proposed for dynamic calculation qf the median and other quantiles. The estimates are produced dynamically as the observations are generated. The observations are not stored; therefore, the algorithm has a very small and fixed storage requirement regardless of the number of observations. This makes it ideal for implementing in a quantile chip that can be used in industrial controllers and recorders. The algorithm is further extended to histogram plotting. The accuracy of the algorithm is analyzed.


Count-Min Sketch is better than P^2 in that it also gives error bound while the latter does not.
Also consider "Space-Efficient Online Computation of Quantile Summaries" by Greenwald and Khanna, which also gives error bounds and has good memory requirements.
Also, for a probabilistic approach, see this blog post: research.neustar.biz/2013/09/16/… and the paper that it refers to is here: arxiv.org/pdf/1407.1121v1.pdf This is called "Frugal Streaming"
The Frugal Streaming site went down, here’s an archive.org link: web.archive.org/web/20190430013331/http://research.neustar.biz/…
m
mic

If we want to find the median of the n most recently seen elements, this problem has an exact solution that only needs the n most recently seen elements to be kept in memory. It is fast and scales well.

An indexable skiplist supports O(ln n) insertion, removal, and indexed search of arbitrary elements while maintaining sorted order. When coupled with a FIFO queue that tracks the n-th oldest entry, the solution is simple:

class RunningMedian:
    'Fast running median with O(lg n) updates where n is the window size'

    def __init__(self, n, iterable):
        self.it = iter(iterable)
        self.queue = deque(islice(self.it, n))
        self.skiplist = IndexableSkiplist(n)
        for elem in self.queue:
            self.skiplist.insert(elem)

    def __iter__(self):
        queue = self.queue
        skiplist = self.skiplist
        midpoint = len(queue) // 2
        yield skiplist[midpoint]
        for newelem in self.it:
            oldelem = queue.popleft()
            skiplist.remove(oldelem)
            queue.append(newelem)
            skiplist.insert(newelem)
            yield skiplist[midpoint]

Here are links to complete working code (an easy-to-understand class version and an optimized generator version with the indexable skiplist code inlined):

http://code.activestate.com/recipes/576930-efficient-running-median-using-an-indexable-skipli/

http://code.activestate.com/recipes/577073 .


If I'm understanding it correctly though, this only gives you a median of the last N elements seen, not all the elements up to that point. This does seem like a really slick solution for that operation though.
Right. The answer sounds as if it was possible to find the median of all elements by just keeping the last n elements in memory - that's impossible in general. The algorithm just finds the median of the last n elements.
The term "running median" is typically used to refer to the median of a subset of data. The OP is used a common term in a non-standard way.
S
Sud K

An intuitive way to think about this is that if you had a full balanced binary search tree, then the root would be the median element, since there there would be the same number of smaller and greater elements. Now, if the tree isn't full this won't be quite the case since there will be elements missing from the last level.

So what we can do instead is have the median, and two balanced binary trees, one for elements less than the median, and one for elements greater than the median. The two trees must be kept at the same size.

When we get a new integer from the data stream, we compare it to the median. If it's greater than the median, we add it to the right tree. If the two tree sizes differ more than 1, we remove the min element of the right tree, make it the new median, and put the old median in the left tree. Similarly for smaller.


How are you going to do that? "we remove the min element of the right tree"
I meant binary search trees, so the min element is all the way left from the root.
P
Peteris

Efficient is a word that depends on context. The solution to this problem depends on the amount of queries performed relative to the amount of insertions. Suppose you are inserting N numbers and K times towards the end you were interested in the median. The heap based algorithm's complexity would be O(N log N + K).

Consider the following alternative. Plunk the numbers in an array, and for each query, run the linear selection algorithm (using the quicksort pivot, say). Now you have an algorithm with running time O(K N).

Now if K is sufficiently small (infrequent queries), the latter algorithm is actually more efficient and vice versa.


In the heap example, lookup is constant time, so I think it should be O(N log N + K), but your point still holds.
Yes, good point, will edit this out. You're right N log N is still the leading term.
A
Andrushenko Alexander

Here is my simple but efficient algorithm (in C++) for calculating running median from a stream of integers:

#include<algorithm>
#include<fstream>
#include<vector>
#include<list>

using namespace std;

void runningMedian(std::ifstream& ifs, std::ofstream& ofs, const unsigned bufSize) {
    if (bufSize < 1)
        throw exception("Wrong buffer size.");
    bool evenSize = bufSize % 2 == 0 ? true : false;
    list<int> q;
    vector<int> nums;
    int n;
    unsigned count = 0;
    while (ifs.good()) {
        ifs >> n;
        q.push_back(n);
        auto ub = std::upper_bound(nums.begin(), nums.end(), n);
        nums.insert(ub, n);
        count++;
        if (nums.size() >= bufSize) {
            auto it = std::find(nums.begin(), nums.end(), q.front());
            nums.erase(it);
            q.pop_front();
            if (evenSize)
                ofs << count << ": " << (static_cast<double>(nums[nums.size() / 2 - 1] +
                static_cast<double>(nums[nums.size() / 2]))) / 2.0 << '\n';
            else
                ofs << count << ": " << static_cast<double>(nums[nums.size() / 2]);
        }
    }
}

The bufferSize specifies the size of the numbers sequence, on which the running median must be calculated. When reading numbers from the input stream ifs the vector of the size bufferSize is maintained in sorted order. The median is calculated by taking the middle of the sorted vector, if bufferSize is odd, or the sum of the two middle elements divided by 2, when bufferSize is even. Additinally, I maintain a list of last bufferSize elements read from input. When a new element is added, I put it in the right place in sorted vector and remove from the vector the element added bufferSize steps before (the value of the element retained in the front of the list). In the same time I remove the old element from the list: every new element is placed on the back of the list, every old element is removed from the front. After reaching the bufferSize, both the list and the vector stop to grow, and every insertion of a new element is compensated be deletion of an old element, placed in the list bufferSize steps before. Note, I do not care, whether I remove from the vector exactly the element, placed bufferSize steps before, or just an element that has the same value. For the value of median it does not matter. All calculated median values are output in the output stream.


D
Darius Bacon

Can't you do this with just one heap? Update: no. See the comment.

Invariant: After reading 2*n inputs, the min-heap holds the n largest of them.

Loop: Read 2 inputs. Add them both to the heap, and remove the heap's min. This reestablishes the invariant.

So when 2n inputs have been read, the heap's min is the nth largest. There'll need to be a little extra complication to average the two elements around the median position and to handle queries after an odd number of inputs.


Doesn't work: you can drop things that later turn out to be near the top. For instance, try your algorithm with the numbers 1 to 100, but in reverse order: 100, 99, ..., 1.
Thanks, zellyn. Silly of me to convince myself the invariant was reestablished.