Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'influxdb' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
def test_influxdb_cache(self):
client = InfluxDBClient(host='localhost', port=8086, username='root', password='root', database='test_cache')
try:
client.drop_database('test_cache')
client.create_database('test_cache')
client.switch_database('test_cache')
with InfluxDBCache(client=DataFrameClient(host='localhost', port=8086, username='root', password='root', database='test_cache')) as cache:
listeners = SyncListeners()
QuandlEvents(listeners)
non_cache_data = get_table([{'datatable_code': 'SHARADAR/SF1', 'ticker': 'AAPL', 'dimension': 'MRY', 'qopts': {"columns": ['dimension', 'ticker', 'datekey', 'revenue']}},
{'datatable_code': 'SHARADAR/SF1', 'ticker': 'IBM', 'dimension': 'MRY', 'qopts': {"columns": ['dimension', 'ticker', 'datekey', 'revenue']}}])
items = list()
for df in non_cache_data['SHARADAR/SF1']:
items.append(df.rename({'revenue': 'value', 'datekey': 'timestamp'}, axis=1).set_index('timestamp'))
def _setup_influxdb_server(inst):
inst.influxd_inst = InfluxDbInstance(
inst.influxdb_template_conf,
udp_enabled=getattr(inst, 'influxdb_udp_enabled', False))
inst.cli = InfluxDBClient('localhost',
inst.influxd_inst.http_port,
'root',
'',
database='db')
if not using_pypy:
inst.cliDF = DataFrameClient('localhost',
inst.influxd_inst.http_port,
'root',
'',
database='db')
def get_handle_db():
global HANDLE_DB
if HANDLE_DB == '':
HANDLE_DB = influxdb.InfluxDBClient(
host=DOCKER_IP,
port=TEST_PORT_INFLUXDB_API,
database=DATABASE_NAME,
username="juniper",
password="juniper"
)
return HANDLE_DB
def test_setup_query_fail(self, mock_client):
"""Test the setup for query failures."""
config = {
'influxdb': {
'host': 'host',
'username': 'user',
'password': 'pass',
}
}
mock_client.return_value.query.side_effect = \
influx_client.exceptions.InfluxDBClientError('fake')
assert not setup_component(self.hass, influxdb.DOMAIN, config)
def test_alter_retention_policy_invalid(self):
self.cli.create_retention_policy('somename', '1d', 1)
with self.assertRaises(InfluxDBClientError) as ctx:
self.cli.alter_retention_policy('somename', 'db')
self.assertEqual(400, ctx.exception.code)
self.assertIn('{"error":"error parsing query: ',
ctx.exception.content)
rsp = self.cli.get_list_retention_policies()
self.assertEqual(
[{'duration': '0', 'default': True,
'replicaN': 1, 'name': 'default'},
{'duration': '24h0m0s', 'default': False,
'replicaN': 1, 'name': 'somename'}],
rsp
)
def test_query(mock_flux):
db = influxdb.InfluxDBClient(database="fizz")
db.query.side_effect = influxdb.exceptions.InfluxDBClientError(None)
client = InfluxAlchemy(db)
query = client.query(Measurement.new("buzz"))
tools.assert_equal(str(query), "SELECT * FROM buzz;")
def setup_db(self):
try:
self.client.drop_database(self.db_name)
except influxdb.exceptions.InfluxDBClientError:
pass
self.client.create_database(self.db_name)
data = [{
"measurement": series,
"tags": {},
"time": _time,
"fields": {
"value": self.series_values[i],
}
}
for i, series in enumerate(self.series)
for _time in [
(self.end_time - datetime.timedelta(minutes=30)).strftime("%Y-%m-%dT%H:%M:%SZ"),
(self.end_time - datetime.timedelta(minutes=2)).strftime("%Y-%m-%dT%H:%M:%SZ"),
]]
self.assertTrue(self.client.write_points(data))
def test_repr(mock_qry):
mock_qry.side_effect = influxdb.exceptions.InfluxDBClientError(None)
db = influxdb.InfluxDBClient(database="example")
client = InfluxAlchemy(db)
query = client.query(Measurement.new("fizz"))
tools.assert_equal(repr(query), "SELECT * FROM fizz;")
def test_event_listener_fail_write(self, mock_client):
"""Test the event listener for write failures."""
self._setup()
state = mock.MagicMock(
state=1, domain='fake', entity_id='entity-id', object_id='entity',
attributes={})
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
mock_client.return_value.write_points.side_effect = \
influx_client.exceptions.InfluxDBClientError('foo')
self.handler_method(event)
def test_filter_by(mock_qry):
mock_qry.side_effect = influxdb.exceptions.InfluxDBClientError(None)
db = influxdb.InfluxDBClient(database="example")
client = InfluxAlchemy(db)
query = client.query(Measurement.new("fizz")).filter_by(buzz="goo")
tools.assert_equal(str(query), "SELECT * FROM fizz WHERE (buzz = 'goo');")