Commit | Line | Data |
---|---|---|
920dae64 AT |
1 | """A multi-producer, multi-consumer queue.""" |
2 | ||
3 | from time import time as _time | |
4 | from collections import deque | |
5 | ||
6 | __all__ = ['Empty', 'Full', 'Queue'] | |
7 | ||
8 | class Empty(Exception): | |
9 | "Exception raised by Queue.get(block=0)/get_nowait()." | |
10 | pass | |
11 | ||
12 | class Full(Exception): | |
13 | "Exception raised by Queue.put(block=0)/put_nowait()." | |
14 | pass | |
15 | ||
16 | class Queue: | |
17 | def __init__(self, maxsize=0): | |
18 | """Initialize a queue object with a given maximum size. | |
19 | ||
20 | If maxsize is <= 0, the queue size is infinite. | |
21 | """ | |
22 | try: | |
23 | import threading | |
24 | except ImportError: | |
25 | import dummy_threading as threading | |
26 | self._init(maxsize) | |
27 | # mutex must be held whenever the queue is mutating. All methods | |
28 | # that acquire mutex must release it before returning. mutex | |
29 | # is shared between the two conditions, so acquiring and | |
30 | # releasing the conditions also acquires and releases mutex. | |
31 | self.mutex = threading.Lock() | |
32 | # Notify not_empty whenever an item is added to the queue; a | |
33 | # thread waiting to get is notified then. | |
34 | self.not_empty = threading.Condition(self.mutex) | |
35 | # Notify not_full whenever an item is removed from the queue; | |
36 | # a thread waiting to put is notified then. | |
37 | self.not_full = threading.Condition(self.mutex) | |
38 | ||
39 | def qsize(self): | |
40 | """Return the approximate size of the queue (not reliable!).""" | |
41 | self.mutex.acquire() | |
42 | n = self._qsize() | |
43 | self.mutex.release() | |
44 | return n | |
45 | ||
46 | def empty(self): | |
47 | """Return True if the queue is empty, False otherwise (not reliable!).""" | |
48 | self.mutex.acquire() | |
49 | n = self._empty() | |
50 | self.mutex.release() | |
51 | return n | |
52 | ||
53 | def full(self): | |
54 | """Return True if the queue is full, False otherwise (not reliable!).""" | |
55 | self.mutex.acquire() | |
56 | n = self._full() | |
57 | self.mutex.release() | |
58 | return n | |
59 | ||
60 | def put(self, item, block=True, timeout=None): | |
61 | """Put an item into the queue. | |
62 | ||
63 | If optional args 'block' is true and 'timeout' is None (the default), | |
64 | block if necessary until a free slot is available. If 'timeout' is | |
65 | a positive number, it blocks at most 'timeout' seconds and raises | |
66 | the Full exception if no free slot was available within that time. | |
67 | Otherwise ('block' is false), put an item on the queue if a free slot | |
68 | is immediately available, else raise the Full exception ('timeout' | |
69 | is ignored in that case). | |
70 | """ | |
71 | self.not_full.acquire() | |
72 | try: | |
73 | if not block: | |
74 | if self._full(): | |
75 | raise Full | |
76 | elif timeout is None: | |
77 | while self._full(): | |
78 | self.not_full.wait() | |
79 | else: | |
80 | if timeout < 0: | |
81 | raise ValueError("'timeout' must be a positive number") | |
82 | endtime = _time() + timeout | |
83 | while self._full(): | |
84 | remaining = endtime - _time() | |
85 | if remaining <= 0.0: | |
86 | raise Full | |
87 | self.not_full.wait(remaining) | |
88 | self._put(item) | |
89 | self.not_empty.notify() | |
90 | finally: | |
91 | self.not_full.release() | |
92 | ||
93 | def put_nowait(self, item): | |
94 | """Put an item into the queue without blocking. | |
95 | ||
96 | Only enqueue the item if a free slot is immediately available. | |
97 | Otherwise raise the Full exception. | |
98 | """ | |
99 | return self.put(item, False) | |
100 | ||
101 | def get(self, block=True, timeout=None): | |
102 | """Remove and return an item from the queue. | |
103 | ||
104 | If optional args 'block' is true and 'timeout' is None (the default), | |
105 | block if necessary until an item is available. If 'timeout' is | |
106 | a positive number, it blocks at most 'timeout' seconds and raises | |
107 | the Empty exception if no item was available within that time. | |
108 | Otherwise ('block' is false), return an item if one is immediately | |
109 | available, else raise the Empty exception ('timeout' is ignored | |
110 | in that case). | |
111 | """ | |
112 | self.not_empty.acquire() | |
113 | try: | |
114 | if not block: | |
115 | if self._empty(): | |
116 | raise Empty | |
117 | elif timeout is None: | |
118 | while self._empty(): | |
119 | self.not_empty.wait() | |
120 | else: | |
121 | if timeout < 0: | |
122 | raise ValueError("'timeout' must be a positive number") | |
123 | endtime = _time() + timeout | |
124 | while self._empty(): | |
125 | remaining = endtime - _time() | |
126 | if remaining <= 0.0: | |
127 | raise Empty | |
128 | self.not_empty.wait(remaining) | |
129 | item = self._get() | |
130 | self.not_full.notify() | |
131 | return item | |
132 | finally: | |
133 | self.not_empty.release() | |
134 | ||
135 | def get_nowait(self): | |
136 | """Remove and return an item from the queue without blocking. | |
137 | ||
138 | Only get an item if one is immediately available. Otherwise | |
139 | raise the Empty exception. | |
140 | """ | |
141 | return self.get(False) | |
142 | ||
143 | # Override these methods to implement other queue organizations | |
144 | # (e.g. stack or priority queue). | |
145 | # These will only be called with appropriate locks held | |
146 | ||
147 | # Initialize the queue representation | |
148 | def _init(self, maxsize): | |
149 | self.maxsize = maxsize | |
150 | self.queue = deque() | |
151 | ||
152 | def _qsize(self): | |
153 | return len(self.queue) | |
154 | ||
155 | # Check whether the queue is empty | |
156 | def _empty(self): | |
157 | return not self.queue | |
158 | ||
159 | # Check whether the queue is full | |
160 | def _full(self): | |
161 | return self.maxsize > 0 and len(self.queue) == self.maxsize | |
162 | ||
163 | # Put a new item in the queue | |
164 | def _put(self, item): | |
165 | self.queue.append(item) | |
166 | ||
167 | # Get an item from the queue | |
168 | def _get(self): | |
169 | return self.queue.popleft() |