55import time
66import tempfile
77import unittest
8- from objwatch . sinks . consumer import DynamicRoutingConsumer
8+
99from objwatch .sinks .zmq_sink import ZeroMQSink
10+ from objwatch .sinks .consumer import DynamicRoutingConsumer
1011
1112
1213class TestDynamicRoutingConsumer (unittest .TestCase ):
@@ -88,7 +89,7 @@ def test_dynamic_routing_basic(self):
8889 "output_file" : output1 ,
8990 "process_id" : os .getpid (),
9091 }
91-
92+
9293 sink .emit (event1 )
9394 sink .emit (event2 )
9495 sink .emit (event3 )
@@ -114,18 +115,18 @@ def test_dynamic_routing_basic(self):
114115 # Check that at least some messages were received
115116 self .assertTrue (len (content1 ) > 0 , "Output file 1 should contain messages" )
116117 self .assertTrue (len (content2 ) > 0 , "Output file 2 should contain messages" )
117-
118+
118119 # Check for presence of expected messages (may not be all due to ZeroMQ async nature)
119120 if "Message to output1" in content1 :
120121 print ("✓ Received 'Message to output1'" )
121122 else :
122123 print ("✗ Did not receive 'Message to output1' (may be due to ZeroMQ timing)" )
123-
124+
124125 if "Another message to output1" in content1 :
125126 print ("✓ Received 'Another message to output1'" )
126127 else :
127128 print ("✗ Did not receive 'Another message to output1' (may be due to ZeroMQ timing)" )
128-
129+
129130 if "Message to output2" in content2 :
130131 print ("✓ Received 'Message to output2'" )
131132 else :
@@ -172,13 +173,13 @@ def test_file_handle_lru_cache(self):
172173 Test LRU cache for file handles.
173174 """
174175 max_open_files = 3
175-
176+
176177 # Create ZeroMQSink first and bind to endpoint
177178 sink = ZeroMQSink (endpoint = self .endpoint , topic = "" )
178-
179+
179180 # Wait a bit for sink to be ready
180181 time .sleep (0.1 )
181-
182+
182183 consumer = DynamicRoutingConsumer (
183184 endpoint = self .endpoint ,
184185 auto_start = True ,
@@ -226,9 +227,7 @@ def test_consumer_lifecycle(self):
226227 Test proper lifecycle management of DynamicRoutingConsumer.
227228 """
228229 # Create consumer
229- consumer = DynamicRoutingConsumer (
230- endpoint = self .endpoint , auto_start = False , allowed_directories = [self .temp_dir ]
231- )
230+ consumer = DynamicRoutingConsumer (endpoint = self .endpoint , auto_start = False , allowed_directories = [self .temp_dir ])
232231
233232 # Start consumer
234233 consumer .start ()
@@ -284,10 +283,10 @@ def test_process_id_in_output(self):
284283
285284 # Create ZeroMQSink first and bind to endpoint
286285 sink = ZeroMQSink (endpoint = self .endpoint , topic = "" )
287-
286+
288287 # Wait a bit for sink to be ready
289288 time .sleep (0.1 )
290-
289+
291290 # Create and start consumer
292291 consumer = DynamicRoutingConsumer (
293292 endpoint = self .endpoint , auto_start = True , daemon = True , allowed_directories = [self .temp_dir ]
@@ -321,10 +320,10 @@ def test_process_id_in_output(self):
321320 if os .path .exists (output_file ):
322321 with open (output_file , "r" ) as f :
323322 content = f .read ()
324-
323+
325324 # Check that at least some messages were received
326325 self .assertTrue (len (content ) > 0 , "Output file should contain messages" )
327-
326+
328327 # Check for process ID (may not be present if no messages were received)
329328 if "PID:12345" in content :
330329 print ("✓ Process ID found in output" )
0 commit comments