You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

130 lines
3.8 KiB

#!/usr/bin/python
import os, sys
import logging
import argparse
import eventlet
import tweepy
import config
from telnetsrv.evtlet import TelnetHandler, command
#from str import join
for key in config.twitter_conf:
if config.twitter_conf[key] == 'env':
try:
os.environ[key]
except KeyError:
print "Please, set the environment variable %s or set it in the config" % key
sys.exit(1)
config.twitter_conf[key] = os.environ[key]
logging.getLogger('').setLevel(logging.DEBUG)
TELNET_IP_BINDING = '' #all
parser = argparse.ArgumentParser( description='Run a fttp server.')
parser.add_argument( 'port', metavar="PORT", type=int, help="The port on which to listen on." )
console_args = parser.parse_args()
TELNET_PORT_BINDING = console_args.port
class TwitterApi():
'''Twitter Api'''
auth = tweepy.OAuthHandler(config.twitter_conf['TW_CONSUMER_KEY'], config.twitter_conf['TW_CONSUMER_SECRET'])
auth.set_access_token(config.twitter_conf['TW_ACCESS_TOKEN'], config.twitter_conf['TW_ACCESS_TOKEN_SECRET'])
api = tweepy.API(auth)
def send(self, tweet):
self.api.update_status(tweet)
def get_tweets_by_tag(self, tag):
tweets = ""
feed = self.api.search(tag, count=20)
for i in feed:
# tweets.join(i, "\n")
tweets = tweets + i.text + "\n"
# print type(tweets)
return tweets
# return '\n'.join(tweets)
class TestTelnetHandler(TelnetHandler):
# -- Override items to customize the server --
WELCOME = 'You have connected to the fttp server.'
PROMPT = "fttp% "
authNeedUser = False
authNeedPass = False
twitter = TwitterApi()
get_tweets_event = []
def session_start(self):
'''Called after the user successfully logs in.'''
message = self.twitter.get_tweets_by_tag("OpenFest2015")
event = eventlet.spawn_after(180, self.writemessage, message)
self.get_tweets_event.append(event)
def session_end(self):
'''Called after the user logs off.'''
for event in self.get_tweets_event:
event.kill()
def writeerror(self, text):
'''Called to write any error information (like a mistyped command).
Add a splash of color using ANSI to render the error text in red.
see http://en.wikipedia.org/wiki/ANSI_escape_code'''
TelnetHandler.writeerror(self, "\x1b[91m%s\x1b[0m" % text )
# -- Twitter Commands --
@command(['tweet', 'send'])
def command_tweet(self, params):
''' <message>
Sends the message as a tweet from @OpenFest_Fttp
'''
message = " ".join(params)
self.twitter.send(message)
@command(['feed', 'get_tweets'])
def command_get(self, params):
''' <#tag>
Gets latest tweets by a given tag (Default: #OpenFest2016)
'''
# if not params[0]:
# params[0] = "#OpenFest2016"
self.feed = self.twitter.get_tweets_by_tag("#openfest")
self.writeresponse(self.feed.encode('utf-8').strip())
if __name__ == '__main__':
Handler = TestTelnetHandler
class EvtletStreamServer(object):
def __init__(self, addr, handle):
self.ip = addr[0]
self.port = addr[1]
self.handle = handle
def serve_forever(self):
eventlet.serve(
eventlet.listen((self.ip, self.port)),
self.handle
)
# Multi-eventlet server
server = EvtletStreamServer(
(TELNET_IP_BINDING, TELNET_PORT_BINDING),
Handler.streamserver_handle
)
logging.info("Starting fttp server at port %d. (Ctrl-C to stop)" % (TELNET_PORT_BINDING) )
try:
server.serve_forever()
except KeyboardInterrupt:
logging.info("Server shut down.")