Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'msgpack' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
def from_unix(unix_sec):
"""Create a Timestamp from posix timestamp in seconds.
:param unix_float: Posix timestamp in seconds.
:type unix_float: int or float.
"""
seconds = int(unix_sec // 1)
nanoseconds = int((unix_sec % 1) * 10 ** 9)
return Timestamp(seconds, nanoseconds)
def test_slice_with_limit(self):
message = self.request_message('SLICE', ['1', 3])
header, content = self.handler.command(message)
plain_header = msgpack.unpackb(header)
plain_content = msgpack.unpackb(content)
self.assertEqual(plain_header['status'], SUCCESS_STATUS)
self.assertIsInstance(plain_content['datas'], tuple)
self.assertEqual(len(plain_content['datas']), 3)
self.assertEqual(plain_content['datas'][0], ('1', '11'))
self.assertEqual(plain_content['datas'][1], ('2', '12'))
self.assertEqual(plain_content['datas'][2], ('3', '13'))
def test_pack_datetime():
t = Timestamp(42, 14000)
dt = t.to_datetime()
assert dt == datetime.datetime(1970, 1, 1, 0, 0, 42, 14, tzinfo=_utc)
packed = msgpack.packb(dt, datetime=True)
packed2 = msgpack.packb(t)
assert packed == packed2
unpacked = msgpack.unpackb(packed)
print(packed, unpacked)
assert unpacked == t
unpacked = msgpack.unpackb(packed, timestamp=3)
assert unpacked == dt
x = []
packed = msgpack.packb(dt, datetime=False, default=x.append)
assert x
assert x[0] == dt
assert msgpack.unpackb(packed) is None
def _decode(self, payload):
"""
Helper function that decodes data based on the given Encoder.
"""
if isinstance(self.api._encoder, JSONEncoder):
return json.loads(payload)
elif isinstance(self.api._encoder, MsgpackEncoder):
return msgpack.unpackb(payload, encoding='utf-8')
def test_put_of_valid_key(self):
message = self.request_message('PUT', ['a', '1'])
header, content = self.handler.command(message)
plain_header = msgpack.unpackb(header)
plain_content = msgpack.unpackb(content)
self.assertEqual(plain_header['status'], SUCCESS_STATUS)
self.assertEqual(plain_content['datas'], None)
coll.insert(mock_raw_event(1, i))
start_date = 20
end_date = 50
filter = {'sid' : [1]}
args = (coll, filter, start_date, end_date)
cursor = create_pymongo_iterator(*args)
# We filter to only get dt's between 20 and 50
expected = (mock_raw_event(1, i) for i in xrange(20, 51))
# Assert that our iterator returns the expected values.
for cursor_event, expected_event in izip_longest(cursor, expected):
del cursor_event['_id']
# Easiest way to convert unicode to strings.
cursor_event = msgpack.loads(msgpack.dumps(cursor_event))
assert expected_event.keys() == cursor_event.keys()
assert expected_event.values() == cursor_event.values()
def _make_fake_socket(self, family, type):
udp_socket = self.mox.CreateMockAnything()
udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
udp_socket.bind((self.CONF.collector.udp_address,
self.CONF.collector.udp_port))
def stop_udp(anything):
# Make the loop stop
self.srv.stop()
udp_socket.recvfrom(64 * 1024).WithSideEffects(
stop_udp).AndReturn(
(msgpack.dumps(self.counter),
('127.0.0.1', 12345)))
self.mox.ReplayAll()
return udp_socket
def _send_event(self, event):
if not isinstance(event, dict):
event = self._event_to_dict(event)
# event to a msgpack body message
body = msgpack.Packer().pack(event)
# big endian body len
body_len = struct.pack(">I", len(body))
# first write body length
self._unix_stream_server._connection_socket.sendall(body_len)
# then write body content
self._unix_stream_server._connection_socket.sendall(body)
def setUp(self):
factory = EchoServerFactory(True)
self.proto = factory.buildProtocol(("127.0.0.1", 0))
self.transport = proto_helpers.StringTransport()
self.proto.makeConnection(self.transport)
self.packer = msgpack.Packer(encoding="utf-8")
def testPackUnicode():
test_data = ["", "abcd", ["defgh"], "Русский текст"]
for td in test_data:
re = unpackb(packb(td), use_list=1, raw=False)
assert re == td
packer = Packer()
data = packer.pack(td)
re = Unpacker(BytesIO(data), raw=False, use_list=1).unpack()
assert re == td